Spark String 是使用最廣泛的實時處理框架之一,其他包括Apache Flink, Apache Storm, Kafka Streams。
Spark Streamming 相對于其他有更對的性能問題,它的處理是通過時間窗口而不是逐個事件,會導致延遲發生。
良好的配置和開發有有時候會使性能提高10到20倍,
分享一些可以優化的配置:
一. Kafka的直接實現
1) Spark2 的版本中有兩種方式接收kafka數據,一種是兼容kafka0.8.x,另外一種是direct模式,
其中第一種方式缺乏并行性且不兼容TLS, 如果需要同時并行讀取多個topic,就需要使用direct模式
第一種方式:
val kafkaStream = KafkaUtils.createStream(
streamingContext,
[ZK quorum],
[consumer group id],
[per-topic number of Kafka partitions to consume]
)
direct模式并不關心數據是否來自一個或多個topic
val kafkaStream = KafkaUtils.createDirectStream[String, String]
(ssc,locationStrategy, consumerStrategy)
二,offset 偏移量管理
offset偏移量指示分配給spark消費者的數據從何處讀取,這個非常重要,這個保證了推流過程中的HA,
避免出現錯誤時丟失數據。
在kafka 0.10.x和spark 2 中offset的管理是通過kafka,為了防止在數據處理過程中
丟失數據,一般有以下幾種方式:
1.使用checkpointing
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
2.
在代碼中自己管理offset
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
三,分區管理
在kafka中對即將接收的topic進行分區消費非常重要,進行分區可以運行在不同的spark執行器中
并行接收數據。
以下是創建三個程序并行分別使用兩個cpu和2gb內存
./bin/kafka-topics.sh --create --replication-factor 1 --partitions 3 --zookeeper zookeeper:2181
--topic topicname
conf.set("spark.cores.max", 6)
conf.set("spark.executor.cores", 2)
conf.set("spark.executor.memory", 2GB)
四, spark streaming的一些優化配置
1. 設置poll的超時時間
在spark2中默認的超時時間是512ms, 如果有很多任務是“Task failed”則需要
增加該值的value繼續觀察
conf.set("spark.streaming.kafka.consumer.poll.ms", 512)
2, 系列化
使用序列化在kafka中非常重要,你可以使用默認的序列化函數
val serializers = Map(
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[RowDeserializer]
)
當然你也可以使用kryo進行序列化
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[Foo], classOf[Var]))
conf.set("spark.kryo.registrationRequired", "true")