1. 前言
安裝使用Go SDK可以幫助開發者快速接入并使用天翼云的日志服務相關功能,目前支持同步上傳,異步批量上傳等功能。
2. 使用條件
2.1. 先決條件
用戶需要具備以下條件才能夠使用LTS SDK Java版本:
1、購買并訂閱了天翼云的云日志服務,并創建了日志項目和日志單元,獲取到相應編碼(logProject、logUnit)。
2、已獲取AccessKey 和 SecretKey。
3、已安裝JDK1.8及以上環境。
2.2. 下載及安裝
下載ctyun_lts_java_sdk.zip壓縮包,放到相應位置后并解壓,把包放在本地目錄:<base_path>。如果您想直接使用SDK,可以不做修改,直接使用SDK源碼,示例代碼為example/SamplePutlogs.java。。
- 把SDK源碼構建成jar包,可通過構建工具構建。或者直接使用已有jar包(target目錄下):”ctyun-lts-java-sdk-1.6.0.jar”。
- 把生成的jar包引入本地maven倉庫。可以通過例如idea的maven工具install 到maven倉庫。或者通過命令構建安裝(在jar包所在目錄執行下面命令)。
mvn install:install-file -Dfile=ctyun-lts-java-sdk-1.6.0.jar -DgroupId=cn.ctyun.lts -DartifactId=ctyun-lts-java-sdk -Dversion=1.6.0 -Dpackaging=jar
- 在您的maven工程的pom.xml文件中增加配置
<dependency>
<groupId>cn.ctyun.lts</groupId>
<artifactId>ctyun-lts-java-sdk</artifactId>
<version>1.6.0</version>
</dependency>
- 引入第三方依賴包
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83_noneautotype</version>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.1</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.proto</groupId>
<artifactId>opentelemetry-proto</artifactId>
<version>1.1.0-alpha</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.1</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.23.4</version>
</dependency>
3. SDK基本使用
3.1. 基本使用
使用 SDK訪問 LTS 的服務,需要設置正確的 AccessKey、SecretKey 和服務端 Endpoint,所有的服務可以使用同一 key 憑證來進行訪問,但不同的服務需要使用不同的 endpoint 進行訪問,詳情參考天翼云官網-SDK接入概述。在調用前SDK,需要已知以下參數:
1、云日志服務訪問地址。詳情請查看訪問地址(Endpoint)。
2、key憑證:accessKey和secretKey 。詳情請查看如何獲取訪問密鑰(AK/SK)。
3、日志項目編碼:logProject,在使用SDK前,需要確保您有至少一個已經存在的日志項目,日志項目就是您要將日志上傳到的地方。
4、日志單元編碼:logUnit,在使用SDK前,需要確保日志項目中有至少一個已經存在的日志單元。
| 參數 | 參數類型 | 描述 | 是否必須 |
|---|---|---|---|
| endpoint | string | 域名 | 是 |
| accessKey | string | AccessKey,簡稱ak | 是 |
| secretKey | string | SecretKey ,簡稱sk | 是 |
| logProject | string | 日志項目編碼 | 是 |
| logUnit | string | 日志單元編碼 | 是 |
目前通過SDK將日志上傳到云日志服務有兩種上傳形式:同步上傳和異步上傳。
1、同步上傳:當調用日志上傳接口時,sdk會立即進行http請求調用,并返回發送結果。這種方式結構簡單,可用于發送頻率不高的場景。
2、異步上傳:當調用日志上傳接口時,后臺線程會將日志進行累積,當達到發送條件時,會進行一次合并發送。對于需要頻繁調用發送接口的場景,這種方式性能更卓越,更高效。
示例代碼:同步上傳
public static void main(String args[]) throws LogException, InterruptedException {
? ? ? String accessKey = "your accessKey";
? ? ? String secretKey = "your secretKey";
? ? ? String logProject = "log project Code";//日志項目ID
? ? ? String logUnit = "log unit Code";//日志單元ID
? ? ? String endpoint = "endpoint";
? ? ? Client client = new Client(endpoint, accessKey,secretKey);
? ? ? //構建日志
? ? ? LogItem logItem = new LogItem(System.currentTimeMillis());
? ? ? logItem.setOrigin_msg("le sync ,java, test message!"); //日志原文
? ? ? logItem.PushBackContent("level", "info"); //日志分詞
? ? ? logItem.PushBackLabel("usage_tag", "string");
? ? ? LogItems logItems = new LogItems();
? ? ? logItems.add(logItem);
? ? ? try {
? ? ? ? ? for (int i = 0; i < 100; i++) { //發送100次
PutLogsResponse response = client.PutLogs(logProject, logUnit, logItems, "");
PutLogsResponseBody res = response.getBody();
System.out.println("response: statusCode:"+res.getStatusCode()+" ,message"+res.getMessage()+" ,error:"+res.getError());
}
} catch (LogException e) {
System.out.println("error :" + e.getErrorCode() + " , " + e.getMessage() + " , " + e.getHttpCode());
e.printStackTrace();
} catch (Exception ex) {
ex.printStackTrace();
}
? }
示例代碼:異步批量上傳
public static void main(String[] args) throws Exception {
String accessKey = "your accessKey";
String secretKey = "your secretKey";
String logProject = "log project Code";//日志項目ID
String logUnit = "log unit Code";//日志單元ID
String endpoint = "endpoint";
// 初始化日志生產者
Producer producer = new LogProducer(new ProducerConfig());
producer.buildClient(logProject, endpoint, accessKey, secretKey);
int task = 10; // 要發送的日志任務數
// 提交日志發送任務到線程池
for (int i = 0; i < task; ++i) {
EXECUTOR_SERVICE.submit(
new Runnable() {
@Override
public void run() {
LogItem logItem = Utils.generateLogItem(1);
try {
// 發送日志,使用自定義回調函數
producer.sendLogs(logProject, logUnit, logItem, customCallback());
} catch (InterruptedException e) {
LOGGER.warn("線程在發送日志時被中斷。");
} catch (Exception e) {
LOGGER.error("發送日志失敗,日志項: {}", logItem, e);
}
}
});
}
// 稍作等待后關閉生產者
Thread.sleep(5000);
try {
producer.close();
} catch (InterruptedException e) {
LOGGER.warn("關閉生產者時線程被中斷。");
} catch (ProducerException e) {
LOGGER.info("關閉生產者失敗: {}", e);
}
// 等待所有任務完成
EXECUTOR_SERVICE.shutdown();
}
//回調函數
private static Callback customCallback() {
Callback callback = new Callback() {
@Override
public void onCompletion(Result result) {
// 處理回調結果
if (result.isSuccessful()) {
LOGGER.info("response: statusCode:{} , message:{} , errorCode: {}",result.getStatusCode(), result.getErrorMessage(), result.getErrorCode());
} else {
LOGGER.info("response: statusCode:{} , message:{} , errorCode: {}",result.getStatusCode(), result.getErrorMessage(), result.getErrorCode());
}
}
};
return callback;
}
4. 服務代碼示例-同步上傳
4.1. 關于Client的操作
4.1.1. New Client()
此操作是初始化Client。用戶需要配置至少3個關鍵的參數才夠初始化Client。
| 參數 | 參數類型 | 描述 | 是否必須 |
|---|---|---|---|
| endpoint | string | 域名 | 是 |
| accessKey | string | AccessKey,簡稱ak | 是 |
| secretKey | string | SecretKey ,簡稱sk | 是 |
示例代碼:初始化Client配置
Client client = new Client(endpoint, accessKey,secretKey);
4.2. 關于Log的操作
4.2.1. logData.PushBackContent(logItem)
此操作用于生成待上傳的日志,日志上傳只能上傳LogItem格式的日志,logData包含了一個ArrayList
LogItem類型的日志格式如下:
| 參數 | 類型 | 描述 | 是否必須 |
|---|---|---|---|
| log_timestamp | long | 時間戳,單位納秒 | 是 |
| origin_msg | string | 原始日志內容 | 是 |
| content | ArrayList |
分詞后的日志內容,可用于索引 | 否 |
| labels | ArrayList |
自定義標簽 | 否 |
其中LogContent 和Labels是包含了兩個成員變量的類,成員變量為key:string,value:T。
示例代碼:組裝生成1條日志
LogData logData = new LogData();
LogItem logItem = new LogItem(System.currentTimeMillis());
logItem.setOrigin_msg("java, test message!");
logItem.PushBackContent("level", "info");
logItem.PushBackContent("unit_id", "12345678");
logItem.PushBackContent("area", 1.3145);
logItem.PushBackLabel("usage_tag", "string");
logData.add(logItem);
注意:其中content和labels的key的長度不超過64字符,僅支持數字、字母、下劃線、連字符(-)、點(.),且必須以字母開頭。value類型最好使用字符串(String)和數字類型(int,double),其他類型建議先轉為字符串類型,并且value值不能為空或空字符串。
4.3. 關于日志上傳的操作
4.3.1. PutLogs()
此操作用于日志上傳服務,需要傳入的參數有四個,分別是logProject(日志項目編碼),logUnit(日志單元編碼),logData(要上傳的日志),source(日志來源)。
| 參數 | 類型 | 描述 | 是否必須 |
|---|---|---|---|
| logProject | string | 日志項目編碼 | 是 |
| logUnit | string | 日志單元編碼 | 是 |
| logData | vector |
日志信息 | 是 |
| source | string | 日志的來源 | 是 |
示例代碼:上傳日志
PutLogsResponse response = client.PutLogs(logProject, logUnit, logItems, "");
PutLogsResponseBody res = response.getBody();
System.out.println("response: statusCode:"+res.getStatusCode()+",message"+res.getMessage()+" ,error:"+res.getError());
PutLogsResponse 里面包含了請求的響應頭header和響應體body,其中接收響應數據的響應體body格式如下:
| 參數 | 類型 | 描述 | 示例 |
|---|---|---|---|
| statusCode | int | 返回碼,取值范圍:0:-正常、-1:嚴重錯誤,其他自定義 | |
| message | string | 狀態描述 | SUCCESS |
| error | string | 參考錯誤編碼列表 |
日志服務相關錯誤編碼(部分):
| statusCode | error | message |
|---|---|---|
| -1 | LTS_8000 | 請求失敗,請稍候重試,或提交工單反饋 |
| -1 | LTS_8001 | 內容不合法,無法解析 |
| -1 | LTS_8004 | 日志內容包含的日志必須小于[x] MB和[y]條 |
| -1 | LTS_8006 | 日志內容解壓失敗 |
| -1 | LTS_8007 | Token失效,請重新獲取 |
| -1 | LTS_8009 | 無云日志服務產品實例,請先開通云日志服務 |
| -1 | LTS_8010 | 日志項目不存在 |
| -1 | LTS_8011 | 日志單元不存在 |
| -1 | LTS_8013 | 在1個日志項目下,寫入流量最大限制:200MB/s |
| -1 | LTS_8014 | 在1個日志項目下,寫入次數最大限制:1000次/s |
| -1 | LTS_8015 | 在1個日志單元下,寫入流量最大限制:100MB/s |
| -1 | LTS_8016 | 在1個日志單元下,寫入次數最大限制:500次/s |
| -1 | LTS_18000 | 調用ITIAM的接口失敗 |
5. 服務代碼-異步上傳
異步上傳是為了解決同步上傳無法高頻異步發送等問題所增加的模塊。原理是會開啟多個線程,當調用日志發送接口后,會立刻返回,而內部的線程會將日志數據緩存合并,最后進行批量發送。異步上傳特點如下:
- 線程安全設計 - producer 接口的所有對外暴露方法均經過精心設計,確保在多線程環境下安全無虞。
- 高效異步傳輸 - 調用 producer 的發送接口,用戶可以迅速獲得響應,而數據的實際發送則會在后臺異步進行,并通過緩存和合并機制優化傳輸效率。
- 智能重試機制 - producer 配備智能重試功能,對于可重試的異常,將按照用戶預設的最大重試次數和退避策略自動重試,確保數據的穩定傳輸。
- 詳盡的行為跟蹤 - 用戶可通過 callback 或 future 機制獲取數據發送的實時狀態,包括每次嘗試發送的詳細信息,為問題排查和決策制定提供有力支持。
- 上下文一致性 - producer 保證同一實例產生的日志在服務端保持上下文一致,便于用戶查看和分析日志間的關聯關系。
- 優雅關閉流程 - 當用戶調用 close 方法時,producer 將確保緩存中的所有數據得到妥善處理,并為用戶提供關閉完成的通知,確保資源得到正確釋放。
性能卓越,在面臨海量數據和高資源壓力的場景下,producer 憑借多線程、智能緩存和批量發送等高級功能,幫助用戶輕松達到目標吞吐量,同時簡化了程序設計和開發流程。
異步處理優勢,在內存資源充足的情況下,producer 的異步發送機制使得用戶調用 send 方法時無需等待,實現了計算與 I/O 邏輯的有效分離。用戶可以通過返回的 future 或 callback 隨時了解數據發送狀態。
精細的資源管理,用戶可以通過靈活配置參數,精確控制 producer 使用的內存大小和發送任務的線程數。這不僅避免了資源的無限制消耗,還能根據實際應用場景平衡資源消耗和寫入性能。
5.1.關于Producer操作
5.1.1. LogProducer()
此操作是初始化producer的,producer可以看作是一個啟動器,內部封裝了異步線程的初始化、啟動和關閉等功能,只需要對producer進行操作,即可安全便捷地控制這些異步的線程。使用這份producerConfig配置去初始化一個producer。
ProducerConfig producerConfig = new ProducerConfig();
producerConfig.setLingerMs(2000);
producerConfig.setBatchCountThreshold(4096);
....
Producer producer = new LogProducer(producerConfig);
producerConfig內的屬性是異步操作中的線程所需要的參數,如果不設置參數,則初始化的時候會使用默認的參數,默認參數如下所示:
// 默認總大小(字節)。默認情況下,總大小限制為100MB。
int DEFAULT_TOTAL_SIZE_IN_BYTES = 100 * 1024 * 1024;
// 默認最大塊處理時間。默認情況下,單個塊的最大處理時間為1分鐘。
long DEFAULT_MAX_BLOCK_MS = 60 * 1000L;
// 默認IO線程數。默認值為可用處理器數量的最大值和1之間的較大值。
int DEFAULT_IO_THREAD_COUNT = Math.max(Runtime.getRuntime().availableProcessors(), 1);
// 默認批次大小閾值。當數據量達到512KB時,將觸發一批次的處理。
int DEFAULT_BATCH_SIZE_THRESHOLD_IN_BYTES = 512 *1024;
// 最大批次大小(字節)。默認情況下,單個批次的最大大小限制為5MB。
int MAX_BATCH_SIZE_IN_BYTES = 5* 1024 * 1024;
// 默認批次計數閾值。當積累達到4096個條目時,將觸發一批次的處理。
int DEFAULT_BATCH_COUNT_THRESHOLD = 4096;
// 最大批次計數。默認情況下,單個批次的最大條目數限制為40960。
int MAX_BATCH_COUNT = 40960;
// 默認linger時間(毫秒)。 linger等待的時間默認為2秒。
int DEFAULT_LINGER_MS = 2000;
// Linger時間的下限。linger時間的最小值為100毫秒。
int LINGER_MS_LOWER_LIMIT = 100;
// 默認重試次數。默認情況下,操作失敗后將重試10次。
int DEFAULT_RETRIES = 10;
//默認基礎重試退避時間(毫秒)。重試之間的默認最小間隔為100毫秒。
long DEFAULT_BASE_RETRY_BACKOFF_MS = 100L;
// 默認最大重試退避時間(毫秒)。重試之間的最大間隔默認為50秒。
long DEFAULT_MAX_RETRY_BACKOFF_MS = 50 * 1000L;
5.1.2. BuildClient()
此操作是根據projectConfig配置初始化client,它會根據配置中的logProject屬性去初始化一個client,不同的logProject會構建不同的client,使用不同的配置就可以構建多個client,每個client負責該project項目下的日志發送任務。
//根據不同配置生成不同client,日志上傳互不影響
producer.buildClient(logProject, endpoint, accessKey, secretKey);
producer.buildClient(logProject2, endpoint, accessKey, secretKey);
5.1.3. Close()
此操作是用于關閉producer。當不再需要發送數據或當前進程即將終止時,關閉producer是必要的步驟,以確保producer中緩存的所有數據都能得到妥善處理。當前,producer提供了兩種關閉模式:安全關閉和有限關閉。
安全關閉模式確保在關閉producer之前,所有緩存的數據都已完成處理,所有相關線程都已關閉,并且所有注冊的回調函數都已執行完畢。一旦producer被安全關閉,緩存的批次數據會立即得到處理,并且不會被重試。如果回調函數沒有被阻塞,close方法通常能夠迅速返回。
Thread.sleep(5000);
producer.close();
有限關閉模式適用于那些可能存在阻塞回調函數的場景,但您又希望close方法能在指定的時間內返回。為此,可以使用close(long timeoutMs)方法,并指定一個超時時間。如果超過了指定的timeoutMs時間后producer仍未完全關閉,該方法將拋出一個IllegalStateException異常,這意味著可能還有部分緩存的數據未及時處理就被丟棄,同時用戶注冊的回調函數也可能不會被執行。
producer.close(5000);
5.2.關于異步發送操作
5.2.1. SendLogs()
此操作是將日志發送到后臺的日志累加器隊列中,然后立刻返回。累加器的狀態達到可發送條件時(日志量達到閾值或者等待時間達到閾值),后臺任務的線程將里面的日志進行打包批量發送。
producer.sendLogs(logProject, logUnit, logItem, customerCallback());
//或者
producer.sendLogs(logProject, logUnit, logItems);
sendLogs()方法有很多重載方法,可以滿足多種類型的發送,既可以發送單條日志,也可以發送多條日志,同時也可以根據需求是否需要結果返回值。類型如下:
sendLogs(String logProject, String logUnit, LogItem logItem)
sendLogs(String logProject, String logUnit, List<LogItem> logItems)
sendLogs(String logProject, String logUnit, LogItem logItem, Callback callback)
sendLogs(String logProject, String logUnit, List<LogItem> logItems, Callback callback)
...
5.3.關于獲取發送結果的操作
由于 producer 提供的所有發送方法都是異步的,需要通過返回的 future 或者傳入的 callback 獲取發送結果。
5.3.1. Future
SendLogs 方法會返回一個 ListenableFuture,它除了可以像普通 future 那樣通過調用 get 方法阻塞獲得發送結果外,還允許注冊回調方法(回調方法會在完成 future 設置后被調用)。
以下代碼片段展示了 ListenableFuture 的使用方法,ExexutorFuture.java 對sendLogs()進行了回調結果的封裝,現在只需要調用executorFuture.executeTask()就可自動返回回調結果。
int threadNum = 4;
ExecutorFuture executorFuture = new ExecutorFuture(producer,threadNum);
for (int i = 0; i < 1000; i++) {
executorFuture.executeTask(logProject, logUnit, Utils.generateLogItem(5));
}
Thread.sleep(5000); //關閉producer前,等待5s
producer.close();
executorFuture.shutdown();
用戶可以自定義回調函數,為該 future 注冊一個 FutureCallback 并將其投遞到應用提供的線程池 executorService中執行,完整樣例可參考 SamplePutLogsFuture.java、ExexutorFuture.java。
ListenableFuture<Result> future = producer.sendLogs(logProject, logUnit, logItem);
Futures.addCallback(future, new FutureCallback<Result>() {
@Override
public void onSuccess(Result result) {
System.out.println("response : " + result.getErrorMessage()+" , Code:"+result.getErrorCode());
}
@Override
public void onFailure(Throwable t) {
System.err.println("response : " + t);
}
}, executorService); // 在執行器服務的線程中執行回調
5.3.2. Callback
除了使用 future 外,您還可以通過在調用 send 方法時注冊 callback 獲取數據發送結果,代碼片段如下。(完整樣例可參考 SamplePutLogsCallback.java)
producer.sendLogs(logProject, logUnit, logItem, customCallback());
private static Callback customCallback() {
Callback callback = new Callback() {
@Override
public void onCompletion(Result result) {
? ? // 處理回調結果
? ? if (result.isSuccessful()) {
? ? LOGGER.info("response: {} , {}", result.getErrorMessage(), ? result.getErrorCode());
? ? } else {
? ? ? LOGGER.info("response: {} , {}", result.getErrorMessage(), ? result.getErrorCode());
? ? ? }
? ? }
? };
? return callback;
}