Logstash 是一個開源的數據處理管道工具,廣泛用于數據收集、處理和傳輸。它通常作為“ELK Stack”(Elasticsearch、Logstash、Kibana、Beats)的一個核心組件,用于處理結構化和非結構化數據。
天翼云Logstash可以實現將源Elasticsearch實例(如天翼云、自建或第三方Elasticsearch實例)中的數據遷移至天翼云Elasticsearch實例。在升級實例版本、實例架構調整、或跨區域的實例數據遷移時,可以選擇使用天翼云Logstash遷移源Elasticsearch實例數據。
Logstash的方式遷移數據支持跨大版本,且遷移方式靈活,下表是支持的集群版本:
| 源\目標 | Elasticsearch7.10.2 | OpenSearch2.19.1 |
|---|---|---|
| Elasticsearch6.x | √ | √ |
| Elasticsearch7.x小于7.10.2 | √ | √ |
| Elasticsearch7.x大于7.10.2 | √ | √ |
| Elasticsearch8.x | √ | √ |
本文以自建Elasticsearch 7.10.2版本遷移至天翼云Elasticsearch實例為例子。
前提條件
已經創建天翼云Elasticsearch實例。
已經在創建的Elasticsearch實例中加裝了Logstash實例。
加裝Logstash能夠通過內網或公網訪問需要遷移的源Elasticsearch實例。
Logstash工作模型
Logstash工作模型核心部分為三部分:輸入(Input)、過濾器(Filter)、輸出(Output),按照配置管道文件的順序對數據進行提取、處理轉換、輸出。
1.輸入(Input):Logstash支持多種數據輸入源,如文件、數據庫、消息隊列以及Elasticsearch等。在我們的場景中,源Elasticsearch實例就是輸入數據源。Logstash會批量提取源Elasticsearch實例中的數據。
2.過濾器(Filter):過濾器是可選的,用于對輸入數據進行實時處理和轉換。它提供了一些強大的插件,可以對數據進行解析、變換、裁剪或其他操作。在我們的場景中,可以選擇是否使用過濾器來處理遷移中的數據,例如刪除源數據中不需要遷移的字段等操作。
3.輸出(Output):Logstash的輸出插件負責將處理后的數據寫入到目標位置,這可以是文件、數據庫、消息隊列,或者像本例中的天翼云Elasticsearch實例。
遷移必備的信息
源實例(天翼云、自建或第三方Elasticsearch實例)訪問地址、用戶名以及密碼。
目的實例(天翼云Elasticsearch實例)訪問地址、用戶名以及密碼。
提前在目的實例創建好源實例中的待遷移索引的索引結構。
測試Elasticsearch實例服務正常
curl //{ip}:{port}分別將ip和port替換為源實例以及目的實例的實際ip地址和端口號。
使用自建Logstash全量遷移數據
例如進入“Logstash實例管理”界面,在左側導航欄選擇“管道管理”,進入管道管理頁面。選擇新建管道,在Logstash管道管理下創建一個管道名稱為test的管道,寫入下面的配置:
input{
elasticsearch{
# 源Elasticsearch實例的訪問地址
hosts ==> ["//{ip}:{port}", "//{ip}:{port}", "//{ip}:{port}"]
# 訪問源Elasticsearch實例的用戶名和密碼,如無安全機制可不配置
user ==> "*********"
password ==> "********"
# 配置源實例中待遷移的索引,可以使用通配符
index ==> "index1, index2"
# 查詢Elasticsearch實例包含元數據
docinfo ==> true
# 使用多個切片提高吞吐量,合理值的范圍從2到大約8,一般不要超過索引分片數
slices ==> 2
# Logstahs每次查詢Elasticsearch實例返回的最大數據條數
size ==> 1000
}
}
# 在此對數據進行處理
filter {
mutate {
# 移除logstash增加的字段
remove_field ==> ["@metadata", "@version"]
}
}
output{
elasticsearch{
# 目標Elasticsearch實例的訪問地址
hosts ==> ["//{ip}:{port}", "//{ip}:{port}", "//{ip}:{port}"]
# 訪問目標Elasticsearch實例的用戶名和密碼,如無安全機制可不配置
user ==> "********"
password ==> "********"
# 配置目標實例的索引,如下配置時和源實例保持一致
index ==> "%{[@metadata][_index]}"
document_id ==> "%{[@metadata][_id]}"
}
}點擊保存并部署完成管道注冊,通過預檢驗后,會進行全量數據遷移。通過日志和目的實例中的索引可以觀察是否數據正常遷移。
待遷移完成后,選擇停止管道,取消注冊。
完成遷移后對比遷移前后索引結構、索引中數據條數、索引存儲等指標,保證數據全部遷移完畢。
使用加裝的Logstash增量遷移數據
增量遷移數據和全量遷移數據類似,區別在于增量遷移需要待遷移的索引中有增量字段。
增量遷移數據的Logstash管道配置文件和全量遷移數據不同在于需要配置具體的’query’。例如,有一個索引中有一個時間字段'created_at',類型為'date',可能的值例如,"2024-10-11T12:34:56Z"。
那么需要添加如下配置,‘query’參數的內容是Elasticsearch實例查詢的語法,例如:
query ==> '{"query":{"bool":{"should":[{"range":{"created_at":{"from":"2024-10-11T12:34:56Z"}}}]}}}'需要根據具體索引結構來修改。
整個管道文件完成的配置如下:
input{
elasticsearch{
# 源Elasticsearch實例的訪問地址
hosts ==> ["//{ip}:{port}", "//{ip}:{port}", "//{ip}:{port}"]
# 訪問源Elasticsearch實例的用戶名和密碼,如無安全機制可不配置
user ==> "*********"
password ==> "********"
# 配置源實例中待遷移的索引,可以使用通配符
index ==> "index1, index2"
query ==> '{"query":{"bool":{"should":[{"range":{"created_at":{"from":"2024-10-11T12:34:56Z"}}}]}}}'
# 查詢Elasticsearch實例包含元數據
docinfo ==> true
# 使用多個切片提高吞吐量,合理值的范圍從2到大約8,一般不要超過索引分片數
slices ==> 2
# Logstahs每次查詢Elasticsearch實例返回的最大數據條數
size ==> 1000
}
}
# 在此對數據進行處理
filter {
mutate {
# 移除logstash增加的字段
remove_field ==> ["@metadata", "@version"]
}
}
output{
elasticsearch{
# 目標Elasticsearch實例的訪問地址
hosts ==> ["//{ip}:{port}", "//{ip}:{port}", "//{ip}:{port}"]
# 訪問目標Elasticsearch實例的用戶名和密碼,如無安全機制可不配置
user ==> "********"
password ==> "********"
# 配置目標實例的索引,如下配置時和源實例保持一致
index ==> "%{[@metadata][_index]}"
document_id ==> "%{[@metadata][_id]}"
}
}和全量遷移一樣,保存并部署完成管道注冊,即可完成增量遷移。