1. 前言
安裝使用Go SDK可以幫助開發者快速接入使用天翼云的日志服務相關功能,目前支持同步上傳,異步批量上傳等功能。
2. 使用條件
2.1. 先決條件
用戶需要具備以下條件才能夠使用云日志服務 SDK Go版本:
1、已開通云日志服務,并創建了日志項目和日志單元,獲取到相應編碼(logProject、logUnit)。
2、已獲取AccessKey 和 SecretKey。
3、已安裝Go開發環境,推薦安裝Go1.19或以上版本。
2.2. 下載及安裝
下載ctyun_lts_go_sdk.zip壓縮包,放到相應位置后并解壓。“ctyun_lts_go_sdk”目錄中“example”為SDK的使用示例代碼。
1、將解壓后的代碼包的"lts"目錄整個復制到您的項目中。
2、更新依賴項,在您的go項目中執行命令:
go mod tidy
3、當您的代碼中要使用SDK時,在代碼的import中,添加lts包。假設您的項目為lts_sdk_go_demo,則進行如下操作。這樣您就可以順利使用SDK的功能:
import(
? "lts_sdk_go_demo/lts"
)
如果您想直接使用SDK,則可以進行如下步驟處理:
4、進入到ctyun_lts_go_sdk 目錄下,刪除go.mod 、go.sum文件。
5、執行命令,初始化您的項目:
go mod init ctyun_lts_go_sdk ?
6、接著執行命令:
go mod tidy
7、進入example目錄下,運行sample_putlogs.go示例
go run sample_putlogs.go
8、或者進入example目錄下,構建您的go項目,執行命令,以生成二進制文件。
go build -o sample_putlogs sample_putlogs.go ? ? ? (linux 環境)
go build -o sample_putlogs.exe sample_putlogs.go ? (windows 環境)
9、運行sample_putlogs示例:
./sample_putlogs ? ? ? ? (linux 環境)
sample_putlogs.exe ? ? ? (windows 環境)
3. SDK基本使用
3.1. 基本使用
使用 SDK訪問云日志服務,需要設置正確的 AccessKey、SecretKey 和endpoint,所有的服務可以使用同一 key 憑證來進行訪問,但不同的服務地區需要使用不同的 endpoint 進行訪問,詳情參考天翼云官網-SDK接入概述。在調用前SDK,需要已知以下參數:
- 云日志服務訪問地址。詳情請查看訪問地址(Endpoint)。
- key憑證:accessKey和secretKey 。詳情請查看如何獲取訪問密鑰(AK/SK)。
- 日志項目編碼:logProject,在使用SDK前,需要確保您有至少一個已經存在的日志項目。
- 日志單元編碼:logUnit,在使用SDK前,需要確保日志項目中有至少一個已經存在的日志單元。
- 待上傳的日志:logItem,在使用SDK前,需要確保日志已按照特定格式組織。
| 參數 | 參數類型 | 描述 | 是否必須 |
|---|---|---|---|
| endpoint | string | 域名 | 是 |
| accessKey | string | AccessKey,簡稱ak | 是 |
| secretKey | string | SecretKey ,簡稱sk | 是 |
| logProject | string | 日志項目編碼 | 是 |
| logUnit | string | 日志單元編碼 | 是 |
目前通過SDK將日志上傳有兩種上傳形式:同步上傳和異步批量上傳。
1、同步上傳:當調用日志上傳接口時,sdk會立即進行http請求調用,并返回發送結果。這種方式結構簡單,可用于發送頻率不高的場景。
2、異步批量上傳:當調用日志上傳接口時,后臺線程會將日志進行累積,當達到發送條件時,會進行一次合并發送。對于需要頻繁調用發送接口的場景,這種方式性能更卓越,更高效。
示例代碼:同步上傳
import (
"ctyun_lts_go_sdk/lts"
"fmt"
"time"
)
func main() {
ak := "your accessKey"
sk := "your secretKey"
logProject := "log project Code"
logUnit := "log unit Code"
endpoint := "//guizhou-lts.daliqc.cn"
clientConfig := lts.GetDefaultClientConfig(endpoint, ak, sk, logProject)
client, err := lts.CreateClient(clientConfig)
if err != nil {
fmt.Println("client initialization failed, err:", err)
return
}
logTimestamp := time.Now().UnixNano()
originMsg := "go sdk test oriMessage"
contents := make(map[string]any)
contents["contentInt"] = logTimestamp
contents["contentString"] = "contents test string"
contents["contentDouble"] = 3.1415926
contents["contentBool"] = "true"
labels := make(map[string]any)
labels["user_tag"] = "string"
logItem := lts.GenerateLogItem(logTimestamp, originMsg, contents, labels)
logItems := lts.NewLogItems()
for i := 0; i < 10; i++ {
logItems.AddLogItem(logItem)
}
for j := 0; j < 100; j++ {
response, err := client.PutLogs(logProject, logUnit, logItems)
if err != nil {
fmt.Println("Put log failed: ", err)
} else {
fmt.Println("Response message: ", response.Message)
}
}}
示例代碼:異步批量上傳
func main() {
ak := "your accessKey"
sk := "your secretKey"
logProject := "log project Code"
logUnit := "log unit code"
endpoint := "your endpoint"
producerConfig := lts.GetDefaultProducerConfig()
producerConfig.AllowLogLevel = "warn"
producer := lts.InitProducer(producerConfig)
// 可以根據clientConfig 創建多個client,client間互不影響
clientConfig := lts.GetDefaultClientConfig(endpoint, ak, sk, logProject)
err := producer.BuildClient(clientConfig)
if err != nil {
fmt.Println("client initialization failed, err:", err)
return
}
producer.Start()
for i := 0; i < 1000; i++ { //直接發送,內部異步批量發送
err := producerInstance.SendLogs(logProject, logUnit, logItem)
if err != nil {
fmt.Println(err)
}
}
time.Sleep(10 * time.Second)
producer.SafeClose()
}
4. 服務代碼示例-同步上傳
4.1. 關于Client的操作
4.1.1. GetDefaultClientConfig()
此操作是獲取Client的默認配置。至少需要4個關鍵的參數,config配置如下:
| 參數 | 參數類型 | 描述 | 是否必須 |
|---|---|---|---|
| Endpoint | string | 域名 | 是 |
| AccessKey | string | AccessKey,簡稱ak | 是 |
| SecretKey | string | SecretKey ,簡稱sk | 是 |
| LogProject | string | LogProject,日志項目編碼 | 是 |
| UserAgent | string | lts-sdk-go/{go版本信息} | 否 |
| RequestTimeOut | time.Duration | 請求超時時間,默認60s | 否 |
| RetryTimeOut | time.Duration | 重試超時時間,默認90s | 否 |
| CompressType | string | 日志壓縮方式,默認lz4 | 否 |
示例代碼:獲取ClinetConfig配置
clientConfig := lts.GetDefaultClientConfig(endpoint, ak, sk, logProject)
//也可以進行自定義參數值
clientConfig.RequestTimeOut=30 * time.Second
clientConfig.UserAgent=”log-sdk-go/1.6.0’
4.1.2. CreateClient
此操作是根據clientConfig創建一個Client。之后,可以進行Client初始化,其中有三個主要步驟:初始化client示例、初始化HTTPClient用于發送http請求、通過accessKey和secretKey獲取臨時憑證Token,關于HTTPClient中的參數,用戶也可以根據項目的需要自行修改。
示例代碼:初始化生成Client
//初始化 Client
client, err := lts.CreateClient(clientConfig)
//CreateClient()的實現
func CreateClient(clientConfig *ClientConfig) (*Client, error) {
client := &Client{
Endpoint: clientConfig.Endpoint,
AccessKey: clientConfig.AccessKey,
SecretKey: clientConfig.SecretKey,
CompressType: clientConfig.CompressType,
RequestTimeOut: clientConfig.RequestTimeOut,
RetryTimeOut: clientConfig.RetryTimeOut,
UserAgent: clientConfig.UserAgent,
HTTPClient: &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 90 * time.Second,
}).DialContext,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
DisableKeepAlives: false,
},
},
}
if err := CheckConfig(client); err != nil {
return nil, err
}
err := client.SetToken()
if err != nil {
return nil, err
}
return client, nil
}
4.2. 關于臨時憑證Token的操作
4.2.1. akskToToken()
此操作是為client注入token信息,這一步需要使用ak和sk信息換取臨時憑證TokenInfo,其中包含了token隨機串和過期時間兩個參數。這一步需要去訪問CTIAM的api接口,調用api接口,傳入ak/sk/endpoint信息,返回token信息。
TokenInfo信息如下:
| 參數 | 類型 | 描述 |
|---|---|---|
| Token | string | token 隨機串 |
| ExpireTime | int64 | 過期時間,默認30分鐘 |
獲取TokenInfo這一步在Client 初始化時會自動調用akskToToken,用戶默認可以不用進行這一步操作。
示例代碼:使用ak,sk去獲取臨時憑證token
func (c *Client) SetToken() (err error) {
tokenInfo, err := akskToToken(c.Config.AccessKey, c.Config.SecretKey, c.Config.Endpoint)
if err != nil {
return err
}
c.SecurityToken = tokenInfo
return nil
}
4.3. 關于Log的操作
4.3.1. logItems.AddLogItem(logItem)
此操作用于生成待上傳的日志,日志上傳只能上傳LogItem格式的日志,logItems是一個數組類型,里面包含若干條LogItem日志,格式如下:
| 參數 | 類型 | 描述 | 是否必須 |
|---|---|---|---|
| logItems | []LogItem | LogItem格式的數組,將多份日志組合起來發送 | 是 |
其中LogItem類型需要的參數如下:
| 參數 | 類型 | 描述 | 是否必須 |
|---|---|---|---|
| LogTimestamp | int | 時間戳,單位納秒 | 是 |
| OriginMsg | string | 原始日志內容 | 是 |
| Contents | map[string]any | 日志內容,分詞后的內容 | 否 |
| Labels | map[string]any | 自定義標簽 | 否 |
注意:其中Contents和Labels的key的長度不超過64字符,僅支持數字、字母、下劃線、連字符(-)、點(.),且必須以字母開頭。value類型最好使用字符串(string)和數字類型(int,double),其他類型建議先轉為字符串類型,并且value值不能為空或空字符串。
示例代碼:組裝生成1條日志
logTimestamp := time.Now().UnixNano()
originMsg := "go sdk test oriMessage"
contents := make(map[string]any)
contents["contentInt"] = logTimestamp
contents["contentString"] = "contents test string"
contents["contentDouble"] = 3.1415926
contents["contentBool"] = "true"
labels := make(map[string]any)
labels["user_tag"] = "string"
logItem := lts.GenerateLogItem(logTimestamp, originMsg, contents, labels)
logItems := lts.NewLogItems()
logItems.AddLogItem(logItem)
//如果只需要日志原文,構造一條日志
logItem := lts.GenerateLogItemWithM(originMsg)
//如果只需要日志原文和分詞,構造一條日志
logItem := lts.GenerateLogItemWithMC(originMsg,contents)
4.4. 關于日志上傳的操作
4.4.1. PutLogs()
此操作用于日志上傳服務,需要傳入的參數有三個,分別是logProject(日志項目編碼),logUnit(日志單元編碼),logItems(要上傳的日志)。
| 參數 | 類型 | 描述 | 是否必須 |
|---|---|---|---|
| logProject | string | 日志項目編碼 | 是 |
| logUnit | string | 日志單元編碼 | 是 |
| logItems | []LogItem | 日志信息 | 是 |
示例代碼:上傳日志,默認使用lz4壓縮
response, err := client.PutLogs(logProject, logUnit, logItems)
if err != nil {
? ? fmt.Println("Put log failed: ", err.Error())
} else {
? ? fmt.Println("message: ", response.Message)
}
response 是Response格式的返回響應體,如下表格式:
| 參數 | 類型 | 描述 | 示例 |
|---|---|---|---|
| statusCode | int64 | 返回碼,取值范圍:0:-正常、-1:嚴重錯誤,其他自定義錯誤碼 | |
| message | string | 狀態描述 | SUCCESS |
| error | string | 參考錯誤編碼列表 |
日志服務相關錯誤編碼(部分):
| statusCode | error | message |
|---|---|---|
| -1 | LTS_8000 | 請求失敗,請稍候重試,或提交工單反饋 |
| -1 | LTS_8001 | 內容不合法,無法解析 |
| -1 | LTS_8004 | 日志內容包含的日志必須小于[x] MB和[y]條 |
| -1 | LTS_8006 | 日志內容解壓失敗 |
| -1 | LTS_8007 | Token失效,請重新獲取 |
| -1 | LTS_8009 | 無云日志服務產品實例,請先開通云日志服務 |
| -1 | LTS_8010 | 日志項目不存在 |
| -1 | LTS_8011 | 日志單元不存在 |
| -1 | LTS_8013 | 在1個日志項目下,寫入流量最大限制:200MB/s |
| -1 | LTS_8014 | 在1個日志項目下,寫入次數最大限制:1000次/s |
| -1 | LTS_8015 | 在1個日志單元下,寫入流量最大限制:100MB/s |
| -1 | LTS_8016 | 在1個日志單元下,寫入次數最大限制:500次/s |
| -1 | LTS_18000 | 調用ITIAM的接口失敗 |
4.5. 關于日志消費的操作
4.5.1. GetCursor()
此操作是獲取云日志服務上某個時間點的cursor游標,后面日志消費需要使用到這個游標。
| 參數 | 類型 | 描述 | 是否必須 |
|---|---|---|---|
| logProject | string | 日志項目編碼 | 是 |
| logUnit | string | 日志單元編碼 | 是 |
| from | string | 起始時間,可為"begin"/ "and" / "2024-10-01 10:10:10" | 是 |
示例代碼:獲取from時間點的游標
// 創建一個 CursorRequest 實例
cursorRequest := <s.CursorRequest{
ProjectCode: logProject,
UnitCode: ? logUnit,
From: ? ? ? from,
}
response, err := client.GetCursor(cursorRequest)
if err != nil {
fmt.Println("Get Cursor failed: ", err)
} else {
fmt.Println("statusCode:", response.StatusCode, ", cursor:", response.ReturnObj, ",message:", response.Message, ",error:", response.Error)
}
response 是CursorResponse格式的返回響應體,如下表格式:
| 參數 | 類型 | 描述 | 示例 |
|---|---|---|---|
| RequestId | string | 請求Id, | |
| StatusCode | int | 返回碼,取值范圍:0:-正常、-1:嚴重錯誤,其他自定義錯誤碼 | |
| Message | string | 狀態描述 | |
| Error | string | 參考錯誤編碼列表 | |
| ReturnObj | int | 游標結果 | 362539770003324928 |
4.5.2. PullLogs()
此操作是根據日志的游標拉取云日志服務日志。
| 參數 | 類型 | 描述 | 是否必須 |
|---|---|---|---|
| logProject | string | 日志項目編碼 | 是 |
| logUnit | string | 日志單元編碼 | 是 |
| start | uint64 | 起始游標 | 是 |
| end | uint64 | 結束游標,填0表示空值入參 | 是 |
| count | uint32 | 拉取的日志數量 | 是 |
示例代碼:拉取游標處的日志
pullLogRequest := <s.PullLogRequest{
LogProject: logProject,
LogUnit: ? logUnit,
Start: ? ? start,
End: ? ? ? end,
Count: ? ? count,
}
response, err := client.PullLogs(pullLogRequest)
if err != nil {
fmt.Println("Pull log failed: ", err)
} else {
fmt.Println("statusCode:", response.StatusCode, ", rows:", response.ReturnObj.Rows, ",message:", response.Message, ",error:", response.Error)
}
response 是PullLogResponse格式的返回響應體,如下表格式:
| 參數 | 類型 | 描述 | 示例 |
|---|---|---|---|
| RequestId | string | 請求Id, | |
| StatusCode | int | 返回碼,取值范圍:0:-正常、-1:嚴重錯誤,其他自定義錯誤碼 | |
| Message | string | 狀態描述 | |
| Error | string | 參考錯誤編碼列表 | |
| ReturnObj | Logs | 日志數據 |
ReturnObj 是Log格式的返回響應體,如下表格式:
| 參數 | 類型 | 描述 | 示例 |
|---|---|---|---|
| Rows | []map[string]any | 里面是類似kv類型的鍵值對,是云日志服務上的日志消息 | |
| Next | int64 | 返回碼,取值范圍:0:-正常、-1:嚴重錯誤,其他自定義錯誤碼 | |
| Status | status | 狀態描述 |
5. 服務代碼-異步批量上傳
異步上傳是為了解決同步上傳無法高頻異步發送等問題所增加的模塊。原理是會開啟多個協程,當調用日志發送接口后,會立刻返回,而內部的協程會將日志數據緩存合并,最后進行批量發送。異步上傳特點如下:安全設計、高效異步傳輸、智能重試機制、詳盡的行為跟蹤 、優雅關閉流程等
性能卓越,在面臨海量數據和高資源壓力的場景下,producer 憑借多協程、智能緩存和批量發送等高級功能,幫助用戶輕松達到目標吞吐量,同時簡化了程序設計和開發流程。
異步處理優勢,在內存資源充足的情況下,producer 的異步發送機制使得用戶調用 sendLogs 方法時無需等待,實現了計算與 I/O 邏輯的有效分離。用戶可以通過 callback 了解日志發送狀態。
5.1. 關于Producer操作
5.1.1. GetDefaultProducerConfig()
此操作是獲取producer的默認的配置文件。producer可以看作是一個啟動器,內部封裝了異步協程的初始化、啟動和關閉等功能,只需要對producer進行操作,即可安全便捷地控制這些異步的協程。使用這份producerConfig配置去初始化一個producer。
producerConfig := lts.GetDefaultProducerConfig()
producerConfig.AllowLogLevel = "warn"
producerConfig.LingerMs = 2000
...
producer := lts.InitProducer(producerConfig)
producerConfig內的屬性是異步操作中的線程所需要的參數,如果不設置參數,則初始化的時候會使用默認的參數,默認參數如下所示:
| 參數 | 類型 | 描述 |
|---|---|---|
| TotalSizeLnBytes | Int64 | 單個 producer 實例能緩存的日志大小上限,默認為 100MB。 |
| MaxIoWorkerCount | Int64 | 單個producer能并發的最多groutine的數量,默認為50,該參數用戶可以根據自己實際服務器的性能去配置。 |
| MaxBlockSec | Int | 如果 producer 可用空間不足, send 方法的最大阻塞時間,默認為60秒。如果超過這個時間后空間仍無法得到滿足,send 方法會拋出TimeoutException。如果將s值設為0,當所需空間無法得到滿足時,send 方法會立即拋出 TimeoutException。如果希望 send 方法一直阻塞直到所需空間得到滿足,可設為負數。 |
| MaxBatchSize | Int64 | 當一個 ProducerBatch 中緩存的日志大于等于batchSizeThresholdInBytes 時,該 batch 將被發送,默認為 512 KB,最大5MB。 |
| MaxBatchCount | Int | 當一個 ProducerBatch 中緩存的日志條數大于等于 batchCountThreshold 時,該 batch 將被發送,默認為 4096,最大40960。 |
| LingerMs | Int64 | 一個 ProducerBatch 從創建到可發送的逗留時間,默認為 2 秒,最小可設置成 100 ms。 |
| Retries | Int | 如果某個 ProducerBatch 首次發送失敗,能夠對其重試的次數,默認為 10 次。 如果 retries 小于等于 0,該 ProducerBatch 首次發送失敗后將直接進入失敗隊列。 |
| MaxReservedAttempts | Int | 每個 ProducerBatch 每次被嘗試發送都對應著一個 Attemp,此參數用來控制返回給用戶的 attempt 個數,默認只保留最近的 11 次 attempt 信息。該參數越大能讓您追溯更多的信息,但同時也會消耗更多的內存。 |
| BaseRetryBackoffMs | Int64 | 首次重試的退避時間,默認為 100 毫秒。 Producer 采樣指數退避算法,第 N 次重試的計劃等待時間為 baseRetryBackoffMs * 2^(N-1)。 |
| MaxRetryBackoffMs | Int64 | 重試的最大退避時間,默認為 50 秒。 |
| AllowLogLevel | String | 設置日志輸出級別分別為debug,info,warn和error,默認值是warn, |
| LogFileName | String | 日志文件輸出路徑,默認輸出到stdout。 |
| IsJsonType | Bool | 是否格式化文件輸出格式,默認為false。 |
| LogMaxSize | Int | 單個日志存儲數量,默認為10M。 |
| LogMaxBackups | Int | 日志輪轉數量,默認為10。 |
| NoRetryStatusCodeList | []int | 用戶配置的不需要重試的錯誤碼列表,當發送日志失敗時返回的錯誤碼在列表中,則不會重試。默認包含400,403,404。 |
5.1.2. BuildClient()
此操作是根據clientConfig配置初始化client,它會根據配置中的logProject屬性去初始化一個client,不同的logProject會構建不同的client,使用不同的配置就可以構建多個client,每個client負責該project項目下的日志發送任務。
clientConfig := lts.GetDefaultClientConfig(endpoint, ak, sk, logProject)
clientConfig2 := lts.GetDefaultClientConfig(endpoint, ak2, sk2, logProject2)
err := producerInstance.BuildClient(clientConfig)
if err != nil {
fmt.Println("client initialization failed, err:", err)
return
}
err = producerInstance.BuildClient(clientConfig2)
if err != nil {
fmt.Println("client2 initialization failed, err:", err)
return
}
5.1.3. Close()
此操作是用于關閉producer。當不再需要發送數據或當前進程即將終止時,關閉producer是必要的步驟,以確保producer中緩存的所有數據都能得到妥善處理。當前,producer提供了兩種關閉模式:安全關閉和有限關閉。
安全關閉模式確保在關閉producer之前,所有緩存的數據都已完成處理,所有相關協程都已關閉,并且所有注冊的回調函數都已執行完畢。一旦producer被安全關閉,緩存的批次數據會立即得到處理,并且不會被重試。如果回調函數沒有被阻塞,close方法通常能夠迅速返回。
producer.SafeClose()
有限關閉模式適用于那些可能存在阻塞回調函數的場景,但您又希望close方法能在指定的時間內返回。有限關閉會接收用戶傳遞的一個參數值,時間單位為秒,當開始關閉producer的時候開始計時,超過傳遞的設定值還未能完全關閉producer的話會強制退出producer,此時可能會有部分數據未被成功發送而丟失。
producer.close(60)
5.2. 關于異步發送操作
5.2.1. SendLogs()
此操作是將日志發送到后臺的日志累加器隊列中,然后立刻返回。累加器的狀態達到可發送條件時(日志量達到閾值或者等待時間達到閾值),后臺任務的線程將里面的日志進行打包批量發送。
producer.sendLogs(logProject, logUnit, logItem)
sendLogs()方法有很多同類方法,可以滿足多種類型的發送,既可以發送單條日志,也可以發送多條日志,同時也可以根據需求是否需要結果返回值。類型如下:
sendLogs(logProject string, logUnit string, logItem *LogItem)
sendLogList(logProject string, logUnit string, logList []*LogItem)
sendLogsWithCallBack(logProject string, logUnit string, logItem *LogItem)
sendLogsListWithCallBack(logProject string, logUnit string, logList []*LogItem)
5.3. 關于獲取發送結果的操作
5.3.1. Callback
在調用sendLogsWithCallBack方法時注冊 callback 獲取數據發送結果,代碼片段如下。(可根據需求自行修改, io_worker.go)
func (callback *Callback) Success(result *Result) {
attemptList := result.GetReservedAttempts()
for _, attempt := range attemptList {
fmt.Println("success ,statusCode:", attempt.StatusCode, ",message:", attempt.ErrorMessage, ",error:", attempt.ErrorCode)
}
}
func (callback *Callback) Fail(result *Result) {
attemptList := result.GetReservedAttempts()
for _, attempt := range attemptList {
fmt.Println("fail , statusCode:", attempt.StatusCode, ",message:", attempt.ErrorMessage, ",error:", attempt.ErrorCode)
}
}