亚欧色一区w666天堂,色情一区二区三区免费看,少妇特黄A片一区二区三区,亚洲人成网站999久久久综合,国产av熟女一区二区三区

  • 發布文章
  • 消息中心
點贊
收藏
評論
分享
原創

spark實踐之使用TransportClient往ES寫入數據

2023-10-31 02:20:15
60
0

問題導讀:
1、如何使用spark連接ES?
2、如何使用TransportClient往ES批量導入數據?
3、在編寫代碼中踩了哪些坑?
4、ES中如何創建索引?


他們之前把數據導入ES是通過單機的程序導的,或者通過logstash從kafka往ES導,但當數據量很大的時候就會變得很低效,我這兩天調研了一下把數據從hdfs直接通過spark導入ES的方法,當然,也適合spark Streaming程序;
這里指出版本號是有必要的,spark版本:1.6.2 ES版本:5.2.1,由于ES的API變動比較頻繁,因此最好參考官網文檔。

連接ES的方法列舉

  •     ES官網中給出了一個與spark連接的方法:是通過RDD可以直接調用 saveToEs 方法實現的;
  •     如果數據量不大的話,可以參考ES提供的RestFulAPI來實現;
  •     本文主要說明我使用的方法,通過 TransportClient 和 bulk 批處理操作來實現,這種方法比較適合數據量很大的情況,又可以靈活處理。


使用TransportClient往ES批量導入的方法

樣例代碼如下:   

[mw_shl_code=java,true]import java.net.InetAddress
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.action.bulk.{BulkRequestBuilder, BulkResponse}
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.transport.client.PreBuiltTransportClient
/**
  * Author: wangxiaogang
  * Date: 2017/7/11
  * Email: Adamyuanyuan@gmail.com
  * hdfs 中的數據根據格式寫到ES中
  */
object HdfsToEs {
  def main(args: Array[String]) {
    if (args.length < 5) {
      System.err.println("Usage: HdfsToEs <file> <esIndex> <esType> <partition>")
      System.exit(1)
    }
    val hdfsInputPath: String = args(0)
    println("hdfsInputPath: " + hdfsInputPath)
    val conf = new SparkConf().setAppName("HdfsToEs")
    val sc = new SparkContext(conf)
    //插入相關,索引 類型 id相關  以args方式提供接口。
    val esIndex: String = args(1)
    val esType: String = args(2)
    val partition: Int = args(3).toInt
    val bulkNum: Int = args(4).toInt
    val hdfsRdd: RDD[String] = sc.textFile(hdfsInputPath, partition)
    val startTime: Long = System.currentTimeMillis
    println("hdfsRDD partition: " + hdfsRdd.getNumPartitions + " setted partition: " + partition)
    hdfsRdd.foreachPartition {
      eachPa => {
        //        生產環境
        val settings: Settings = Settings.builder.put("cluster.name", "production-es").put("client.transport.sniff", true)
          .put("transport.type", "netty3").put("http.type", "netty3").build
        val client: TransportClient = new PreBuiltTransportClient(settings)
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
        var bulkRequest: BulkRequestBuilder = null
        var flag = true
        var lineNum = 0
        for (eachLine <- eachPa) {
          // 每個bulk是10-15M為宜,數據封裝為bulk后會較原來的數據略有增大,如果每行數據約為 1.5KB,則每 10000 行為一個bulk
          if (flag) {
            bulkRequest = client.prepareBulk
            flag = false
          }
          val strArray: Array[String] = eachLine.split("###")
          if (strArray.length != 25) {
            // 表示這行數據又問題,為了不影響整體,則跳過
            println("ERROR: strArray.length != 25: " + strArray.length + " lineNum: " + lineNum + " strArray(0): " + strArray(0))
          } else {
              // LinkedHashMap讓ES中的數據變得有序
            val esDataMap: java.util.Map[String, String] = new java.util.LinkedHashMap[String, String]
            val id: String = strArray(0)
            esDataMap.put("msisdn", id)
            // 數據合并后的格式為: msisdn###w0的前三###w1的前三###如果為空的話就是null...###w23的前三,共25列
            for (i <- 1 to 24) {
              val locTimesListStr = strArray(i)
              val esDataKey = "w" + (i - 1)
              if (locTimesListStr == null || locTimesListStr.isEmpty || locTimesListStr.equals("null")) {
                esDataMap.put(esDataKey, "")
              } else {
                esDataMap.put(esDataKey, locTimesListStr)
              }
            }
            bulkRequest.add(client.prepareIndex(esIndex, esType, id).setSource(esDataMap))
            lineNum += 1
            if (lineNum % bulkNum == 0) {
              val endTime: Long = System.currentTimeMillis
              println("bulk push, current lineNum: " + lineNum + ", currentTime s: " + ((endTime - startTime) / 1000))
              val bbq: BulkResponse = bulkRequest.execute.actionGet()
              flag = true
              if (bbq.hasFailures) {
                println("bbq.hasFailures: " + bbq.toString)
                bulkRequest.execute.actionGet
              }
            }
          }
        }
        if (bulkRequest != null) {
          bulkRequest.execute().actionGet()
        }
        client.close()
        val endTime: Long = System.currentTimeMillis
        println("ths time is: " + (endTime - startTime) / 1000 + "s ")
      }
    }
    sc.stop()
  }
}[/mw_shl_code]

踩坑說明:在編寫代碼中踩了如下坑:

  •     依賴沖突的問題: ES5.2與Spark1.6有如下包會產生依賴: netty-all:io.netty,com.fasterxml.jackson.core:jackson-core, org.apache.logging.log4j:log4j-core.
  •     解決方案:
  •     通過 mvn dependency:tree -Dverbose -Dincludes=com.fasterxml.jackson.core 命令查出依賴原因,然后在pom.xml中增加所需的相關依賴的最高版本;
  •     每個bulk的大小,根據網上的經驗是10M-15M為宜,大概計算一下就好了;
  •     后來在單機測試通過,但在集群模式中還是會出現 netty4的依賴沖突:


[mw_shl_code=java,true]   17/07/17 10:21:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[elasticsearch[_client_][management][T#1],5,main]
    java.lang.NoSuchMethodError: io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
            at org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
            at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:422)
            at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:93)
            at org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:1058)
            at org.elasticsearch.transport.TcpTransport.sendRequestToChannel(TcpTransport.java:1040)
            at org.elasticsearch.transport.TcpTransport.executeHandshake(TcpTransport.java:1555)
            at org.elasticsearch.transport.TcpTransport.openConnection(TcpTransport.java:502)
            at org.elasticsearch.transport.TcpTransport.connectToNode(TcpTransport.java:460)
            at org.elasticsearch.transport.TransportService.connectToNode(TransportService.java:318)
            at org.elasticsearch.client.transport.TransportClientNodesService$SniffNodesSampler$1.run(TransportClientNodesService.java:488)
            at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:527)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)[/mw_shl_code]

有一種解決方案我沒有嘗試成功,就是在pom中將沖突的依賴包exclusions掉,各位感興趣可以嘗試,成功了麻煩告知我一下。參考鏈接:, 使用 maven-shade-plugin 工具打包。

上個方法我嘗試幾次不成功后,使用了比較暴力的方法,直接將ES的netty參數由netty4改成了netty3,

.put("transport.type", "netty3").put("http.type", "netty3").build

好了,打包好之后,程序就可以完美運行了。

ES中創建索引

就算如果ES中是自動創建索引的,也希望你能手動創建索引和字段屬性,因為默認的字段屬性是Text,ES會自動對它進行分詞相關的操作,如果ES中存的字符串你不想讓它被分隔的話,就用keyword替代為Text類型,命令如下:

[mw_shl_code=java,true]PUT  /weekend-20170718
{
  "settings" : {
    "index" : {
      "number_of_shards" : 5,
      "number_of_replicas" : 1,
      "refresh_interval" : "60s"
    },
  "index.routing.allocation.include.zone": "light"
  },
  "mappings": {
    "offline": {
      "properties": {
        "msisdn": {
          "type": "keyword"
        },"w0": {
          "type": "keyword"
        } ...后面省略
      }
    }
  }
}[/mw_shl_code]

創建好索引后檢查一下:

GET /weekend-20170718/_mapping

集群中運行

這個比較簡單,只需要注意以下幾點就好了:

  •     使用jdk1.8版本;
  •     注意內存的申請,可能會出現跑了一段時間后,內存不夠用導致程序退出的情況;
  •     觀測好ES集群的狀態,一段時間后,ES機器的GC比較高
  •     最好別一下子跑所有數據,分幾批跑,這樣就算出問題,只需要重跑那一部分就好了


數據:通過觀察,導入的速度隨著時間的增長呈下降趨勢,整體來說,ES集群隔離的小集群共有五臺物理機,共2.23億條,751G的數據導入用了約4.5小時,平均速度為 45M/s, 1.38W條/s。

0條評論
作者已關閉評論
王****剛
4文章數
0粉絲數
王****剛
4 文章 | 0 粉絲
原創

spark實踐之使用TransportClient往ES寫入數據

2023-10-31 02:20:15
60
0

問題導讀:
1、如何使用spark連接ES?
2、如何使用TransportClient往ES批量導入數據?
3、在編寫代碼中踩了哪些坑?
4、ES中如何創建索引?


他們之前把數據導入ES是通過單機的程序導的,或者通過logstash從kafka往ES導,但當數據量很大的時候就會變得很低效,我這兩天調研了一下把數據從hdfs直接通過spark導入ES的方法,當然,也適合spark Streaming程序;
這里指出版本號是有必要的,spark版本:1.6.2 ES版本:5.2.1,由于ES的API變動比較頻繁,因此最好參考官網文檔。

連接ES的方法列舉

  •     ES官網中給出了一個與spark連接的方法:是通過RDD可以直接調用 saveToEs 方法實現的;
  •     如果數據量不大的話,可以參考ES提供的RestFulAPI來實現;
  •     本文主要說明我使用的方法,通過 TransportClient 和 bulk 批處理操作來實現,這種方法比較適合數據量很大的情況,又可以靈活處理。


使用TransportClient往ES批量導入的方法

樣例代碼如下:   

[mw_shl_code=java,true]import java.net.InetAddress
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.action.bulk.{BulkRequestBuilder, BulkResponse}
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import org.elasticsearch.transport.client.PreBuiltTransportClient
/**
  * Author: wangxiaogang
  * Date: 2017/7/11
  * Email: Adamyuanyuan@gmail.com
  * hdfs 中的數據根據格式寫到ES中
  */
object HdfsToEs {
  def main(args: Array[String]) {
    if (args.length < 5) {
      System.err.println("Usage: HdfsToEs <file> <esIndex> <esType> <partition>")
      System.exit(1)
    }
    val hdfsInputPath: String = args(0)
    println("hdfsInputPath: " + hdfsInputPath)
    val conf = new SparkConf().setAppName("HdfsToEs")
    val sc = new SparkContext(conf)
    //插入相關,索引 類型 id相關  以args方式提供接口。
    val esIndex: String = args(1)
    val esType: String = args(2)
    val partition: Int = args(3).toInt
    val bulkNum: Int = args(4).toInt
    val hdfsRdd: RDD[String] = sc.textFile(hdfsInputPath, partition)
    val startTime: Long = System.currentTimeMillis
    println("hdfsRDD partition: " + hdfsRdd.getNumPartitions + " setted partition: " + partition)
    hdfsRdd.foreachPartition {
      eachPa => {
        //        生產環境
        val settings: Settings = Settings.builder.put("cluster.name", "production-es").put("client.transport.sniff", true)
          .put("transport.type", "netty3").put("http.type", "netty3").build
        val client: TransportClient = new PreBuiltTransportClient(settings)
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
          .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("----"), 8300))
        var bulkRequest: BulkRequestBuilder = null
        var flag = true
        var lineNum = 0
        for (eachLine <- eachPa) {
          // 每個bulk是10-15M為宜,數據封裝為bulk后會較原來的數據略有增大,如果每行數據約為 1.5KB,則每 10000 行為一個bulk
          if (flag) {
            bulkRequest = client.prepareBulk
            flag = false
          }
          val strArray: Array[String] = eachLine.split("###")
          if (strArray.length != 25) {
            // 表示這行數據又問題,為了不影響整體,則跳過
            println("ERROR: strArray.length != 25: " + strArray.length + " lineNum: " + lineNum + " strArray(0): " + strArray(0))
          } else {
              // LinkedHashMap讓ES中的數據變得有序
            val esDataMap: java.util.Map[String, String] = new java.util.LinkedHashMap[String, String]
            val id: String = strArray(0)
            esDataMap.put("msisdn", id)
            // 數據合并后的格式為: msisdn###w0的前三###w1的前三###如果為空的話就是null...###w23的前三,共25列
            for (i <- 1 to 24) {
              val locTimesListStr = strArray(i)
              val esDataKey = "w" + (i - 1)
              if (locTimesListStr == null || locTimesListStr.isEmpty || locTimesListStr.equals("null")) {
                esDataMap.put(esDataKey, "")
              } else {
                esDataMap.put(esDataKey, locTimesListStr)
              }
            }
            bulkRequest.add(client.prepareIndex(esIndex, esType, id).setSource(esDataMap))
            lineNum += 1
            if (lineNum % bulkNum == 0) {
              val endTime: Long = System.currentTimeMillis
              println("bulk push, current lineNum: " + lineNum + ", currentTime s: " + ((endTime - startTime) / 1000))
              val bbq: BulkResponse = bulkRequest.execute.actionGet()
              flag = true
              if (bbq.hasFailures) {
                println("bbq.hasFailures: " + bbq.toString)
                bulkRequest.execute.actionGet
              }
            }
          }
        }
        if (bulkRequest != null) {
          bulkRequest.execute().actionGet()
        }
        client.close()
        val endTime: Long = System.currentTimeMillis
        println("ths time is: " + (endTime - startTime) / 1000 + "s ")
      }
    }
    sc.stop()
  }
}[/mw_shl_code]

踩坑說明:在編寫代碼中踩了如下坑:

  •     依賴沖突的問題: ES5.2與Spark1.6有如下包會產生依賴: netty-all:io.netty,com.fasterxml.jackson.core:jackson-core, org.apache.logging.log4j:log4j-core.
  •     解決方案:
  •     通過 mvn dependency:tree -Dverbose -Dincludes=com.fasterxml.jackson.core 命令查出依賴原因,然后在pom.xml中增加所需的相關依賴的最高版本;
  •     每個bulk的大小,根據網上的經驗是10M-15M為宜,大概計算一下就好了;
  •     后來在單機測試通過,但在集群模式中還是會出現 netty4的依賴沖突:


[mw_shl_code=java,true]   17/07/17 10:21:57 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[elasticsearch[_client_][management][T#1],5,main]
    java.lang.NoSuchMethodError: io.netty.buffer.CompositeByteBuf.addComponents(ZLjava/lang/Iterable;)Lio/netty/buffer/CompositeByteBuf;
            at org.elasticsearch.transport.netty4.Netty4Utils.toByteBuf(Netty4Utils.java:78)
            at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:422)
            at org.elasticsearch.transport.netty4.Netty4Transport.sendMessage(Netty4Transport.java:93)
            at org.elasticsearch.transport.TcpTransport.internalSendMessage(TcpTransport.java:1058)
            at org.elasticsearch.transport.TcpTransport.sendRequestToChannel(TcpTransport.java:1040)
            at org.elasticsearch.transport.TcpTransport.executeHandshake(TcpTransport.java:1555)
            at org.elasticsearch.transport.TcpTransport.openConnection(TcpTransport.java:502)
            at org.elasticsearch.transport.TcpTransport.connectToNode(TcpTransport.java:460)
            at org.elasticsearch.transport.TransportService.connectToNode(TransportService.java:318)
            at org.elasticsearch.client.transport.TransportClientNodesService$SniffNodesSampler$1.run(TransportClientNodesService.java:488)
            at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:527)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)[/mw_shl_code]

有一種解決方案我沒有嘗試成功,就是在pom中將沖突的依賴包exclusions掉,各位感興趣可以嘗試,成功了麻煩告知我一下。參考鏈接:, 使用 maven-shade-plugin 工具打包。

上個方法我嘗試幾次不成功后,使用了比較暴力的方法,直接將ES的netty參數由netty4改成了netty3,

.put("transport.type", "netty3").put("http.type", "netty3").build

好了,打包好之后,程序就可以完美運行了。

ES中創建索引

就算如果ES中是自動創建索引的,也希望你能手動創建索引和字段屬性,因為默認的字段屬性是Text,ES會自動對它進行分詞相關的操作,如果ES中存的字符串你不想讓它被分隔的話,就用keyword替代為Text類型,命令如下:

[mw_shl_code=java,true]PUT  /weekend-20170718
{
  "settings" : {
    "index" : {
      "number_of_shards" : 5,
      "number_of_replicas" : 1,
      "refresh_interval" : "60s"
    },
  "index.routing.allocation.include.zone": "light"
  },
  "mappings": {
    "offline": {
      "properties": {
        "msisdn": {
          "type": "keyword"
        },"w0": {
          "type": "keyword"
        } ...后面省略
      }
    }
  }
}[/mw_shl_code]

創建好索引后檢查一下:

GET /weekend-20170718/_mapping

集群中運行

這個比較簡單,只需要注意以下幾點就好了:

  •     使用jdk1.8版本;
  •     注意內存的申請,可能會出現跑了一段時間后,內存不夠用導致程序退出的情況;
  •     觀測好ES集群的狀態,一段時間后,ES機器的GC比較高
  •     最好別一下子跑所有數據,分幾批跑,這樣就算出問題,只需要重跑那一部分就好了


數據:通過觀察,導入的速度隨著時間的增長呈下降趨勢,整體來說,ES集群隔離的小集群共有五臺物理機,共2.23億條,751G的數據導入用了約4.5小時,平均速度為 45M/s, 1.38W條/s。

文章來自個人專欄
文章 | 訂閱
0條評論
作者已關閉評論
作者已關閉評論
0
0