問題導讀:
1、如何使用spark連接ES?
2、如何使用TransportClient往ES批量導入數據?
3、在編寫代碼中踩了哪些坑?
4、ES中如何創建索引?
他們之前把數據導入ES是通過單機的程序導的,或者通過logstash從kafka往ES導,但當數據量很大的時候就會變得很低效,我這兩天調研了一下把數據從hdfs直接通過spark導入ES的方法,當然,也適合spark Streaming程序;
這里指出版本號是有必要的,spark版本:1.6.2 ES版本:5.2.1,由于ES的API變動比較頻繁,因此最好參考官網文檔。
連接ES的方法列舉
- ES官網中給出了一個與spark連接的方法:是通過RDD可以直接調用 saveToEs 方法實現的;
- 如果數據量不大的話,可以參考ES提供的RestFulAPI來實現;
- 本文主要說明我使用的方法,通過 TransportClient 和 bulk 批處理操作來實現,這種方法比較適合數據量很大的情況,又可以靈活處理。
使用TransportClient往ES批量導入的方法
樣例代碼如下:
[mw_shl_code=java,true]import java.net.InetAddress
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.action.bulk.{BulkRequestBuilder, BulkResponse}
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.transport.client.PreBuiltTransportClient
/**
* Author: wangxiaogang
* Date: 2017/7/11
* Email: Adamyuanyuan@gmail.com
* hdfs 中的數據根據格式寫到ES中
*/
object HdfsToEs {
def main(args: Array[String]) {
if (args.length < 5) {
System.err.println("Usage: HdfsToEs <file> <esIndex> <esType> <partition>")
System.exit(1)
}
val hdfsInputPath: String = args(0)
println("hdfsInputPath: " + hdfsInputPath)
val conf = new SparkConf().setAppName("HdfsToEs")
val sc = new SparkContext(conf)
//插入相關,索引 類型 id相關 以args方式提供接口。
val esIndex: String = args(1)
val esType: String = args(2)
val partition: Int = args(3).toInt
val bulkNum: Int = args(4).toInt
val hdfsRdd: RDD[String] = sc.textFile(hdfsInputPath, partition)
val startTime: Long = System.currentTimeMillis
println("hdfsRDD partition: " + hdfsRdd.getNumPartitions + " setted partition: " + partition)
hdfsRdd.foreachPartition {
eachPa => {
// 生產環境
val settings: Settings = Settings.builder.put("cluster.name", "production-es").put("client.transport.sniff", true)
.put("transport.type", "netty3").put("http.type", "netty3").build
val client: TransportClient = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
var bulkRequest: BulkRequestBuilder = null
var flag = true
var lineNum = 0
for (eachLine <- eachPa) {
// 每個bulk是10-15M為宜,數據封裝為bulk后會較原來的數據略有增大,如果每行數據約為 1.5KB,則每 10000 行為一個bulk
if (flag) {
bulkRequest = client.prepareBulk
flag = false
}
val strArray: Array[String] = eachLine.split("###")
if (strArray.length != 25) {
// 表示這行數據又問題,為了不影響整體,則跳過
println("ERROR: strArray.length != 25: " + strArray.length + " lineNum: " + lineNum + " strArray(0): " + strArray(0))
} else {
// LinkedHashMap讓ES中的數據變得有序
val esDataMap: java.util.Map[String, String] = new java.util.LinkedHashMap[String, String]
val id: String = strArray(0)
esDataMap.put("msisdn", id)
// 數據合并后的格式為: msisdn###w0的前三###w1的前三###如果為空的話就是null...###w23的前三,共25列
for (i <- 1 to 24) {
val locTimesListStr = strArray(i)
val esDataKey = "w" + (i - 1)
if (locTimesListStr == null || locTimesListStr.isEmpty || locTimesListStr.equals("null")) {
esDataMap.put(esDataKey, "")
} else {
esDataMap.put(esDataKey, locTimesListStr)
}
}
bulkRequest.add(client.prepareIndex(esIndex, esType, id).setSource(esDataMap))
lineNum += 1
if (lineNum % bulkNum == 0) {
val endTime: Long = System.currentTimeMillis
println("bulk push, current lineNum: " + lineNum + ", currentTime s: " + ((endTime - startTime) / 1000))
val bbq: BulkResponse = bulkRequest.execute.actionGet()
flag = true
if (bbq.hasFailures) {
println("bbq.hasFailures: " + bbq.toString)
bulkRequest.execute.actionGet
}
}
}
}
if (bulkRequest != null) {
bulkRequest.execute().actionGet()
}
client.close()
val endTime: Long = System.currentTimeMillis
println("ths time is: " + (endTime - startTime) / 1000 + "s ")
}
}
sc.stop()
}
}[/mw_shl_code]
踩坑說明:在編寫代碼中踩了如下坑:
- 依賴沖突的問題: ES5.2與Spark1.6有如下包會產生依賴: netty-all:io.netty,com.fasterxml.jackson.core:jackson-core, org.apache.logging.log4j:log4j-core.
- 解決方案:
- 通過 mvn dependency:tree -Dverbose -Dincludes=com.fasterxml.jackson.core 命令查出依賴原因,然后在pom.xml中增加所需的相關依賴的最高版本;
- 每個bulk的大小,根據網上的經驗是10M-15M為宜,大概計算一下就好了;
- 后來在單機測試通過,但在集群模式中還是會出現 netty4的依賴沖突:
[mw_shl_code=java,true] 17/07/17 10:21:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[elasticsearch[_client_][management][T#1],5,main]
java.lang.NoSuchMethodError: io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
at org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:422)
at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:93)
at org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:1058)
at org.elasticsearch.transport.TcpTransport.sendRequestToChannel(TcpTransport.java:1040)
at org.elasticsearch.transport.TcpTransport.executeHandshake(TcpTransport.java:1555)
at org.elasticsearch.transport.TcpTransport.openConnection(TcpTransport.java:502)
at org.elasticsearch.transport.TcpTransport.connectToNode(TcpTransport.java:460)
at org.elasticsearch.transport.TransportService.connectToNode(TransportService.java:318)
at org.elasticsearch.client.transport.TransportClientNodesService$SniffNodesSampler$1.run(TransportClientNodesService.java:488)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:527)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)[/mw_shl_code]
有一種解決方案我沒有嘗試成功,就是在pom中將沖突的依賴包exclusions掉,各位感興趣可以嘗試,成功了麻煩告知我一下。參考鏈接:, 使用 maven-shade-plugin 工具打包。
上個方法我嘗試幾次不成功后,使用了比較暴力的方法,直接將ES的netty參數由netty4改成了netty3,
.put("transport.type", "netty3").put("http.type", "netty3").build
好了,打包好之后,程序就可以完美運行了。
ES中創建索引
就算如果ES中是自動創建索引的,也希望你能手動創建索引和字段屬性,因為默認的字段屬性是Text,ES會自動對它進行分詞相關的操作,如果ES中存的字符串你不想讓它被分隔的話,就用keyword替代為Text類型,命令如下:
[mw_shl_code=java,true]PUT /weekend-20170718
{
"settings" : {
"index" : {
"number_of_shards" : 5,
"number_of_replicas" : 1,
"refresh_interval" : "60s"
},
"index.routing.allocation.include.zone": "light"
},
"mappings": {
"offline": {
"properties": {
"msisdn": {
"type": "keyword"
},"w0": {
"type": "keyword"
} ...后面省略
}
}
}
}[/mw_shl_code]
創建好索引后檢查一下:
GET /weekend-20170718/_mapping
集群中運行
這個比較簡單,只需要注意以下幾點就好了:
- 使用jdk1.8版本;
- 注意內存的申請,可能會出現跑了一段時間后,內存不夠用導致程序退出的情況;
- 觀測好ES集群的狀態,一段時間后,ES機器的GC比較高
- 最好別一下子跑所有數據,分幾批跑,這樣就算出問題,只需要重跑那一部分就好了
數據:通過觀察,導入的速度隨著時間的增長呈下降趨勢,整體來說,ES集群隔離的小集群共有五臺物理機,共2.23億條,751G的數據導入用了約4.5小時,平均速度為 45M/s, 1.38W條/s。