廣播變量的概念及需求
Spark Application最開始在Driver端,在我們提交任務的時候,需要傳遞到各個Executor的Task上運行。對于一些只讀、固定的數據(比如從DB中讀出的數據),每次需要Driver廣播到各個Task上,每次進下數據的拉取,查詢效率比較低下。廣播變量允許將變量只廣播(提前廣播)給各個Executor。該Executor上的各個Task再從所在節點BlockManager獲取變量,如果本地沒有,那么就從Driver遠程拉取變量副本,并保存在本地的BlockManager中。此后這個executor上的task,都會直接使用本地的BlockManager中的副本。而不是從Driver獲取變量,從而提升效率。
注:一個Executor只需要在第一個Task啟動時,獲得一份Broadcast數據,之后的Task都從本節點的BlockManager中獲取相關數據。
廣播變量的使用方法
1)調用SparkContext.broadcast方法創建一個Broadcast[T]對象。任何序列化的類型都可以這么實現。
2)通過value屬性訪問改對象的值(Java之中為value()方法)
3)變量只會被發送到各個節點一次,應作為只讀值處理(修改這個值不會影響到別的節點)
kryo序列化概念及需求
默認情況下,Spark內部是使用Java的序列化機制,ObjectOutputStream / ObjectInputStream,對象輸入輸出流機制,來進行序列化。
這種默認序列化機制的好處在于,處理起來比較方便,也不需要手動去做什么事情,只在算子里面使用的變量,必須是實現Serializable接口的,可序列化即可。
但是缺點在于,默認的序列化機制的效率不高,序列化的速度比較慢,序列化以后的數據,占用的內存空間相對還是比較大。
Spark支持使用Kryo序列化機制。這種序列化機制,比默認的Java序列化機制速度要快,序列化后的數據更小,大概是Java序列化機制的1/10。
所以Kryo序列化優化以后,可以讓網絡傳輸的數據變少,在集群中耗費的內存資源大大減少。
Kryo序列化機制啟用以后生效的幾個地方
1)、算子函數中使用到的外部變量,使用Kryo以后:優化網絡傳輸的性能,可以優化集群中內存的占用和消耗
2)、持久化RDD,優化內存的占用和消耗。持久化RDD占用的內存越少,task執行的時候,創建的對象,就不至于頻繁的占滿內存,頻繁發生GC。
3)、shuffle:可以優化網絡傳輸的性能
Kryo序列化使用方法
第一步,在SparkConf中設置一個屬性,spark.serializer,org.apache.spark.serializer.KryoSerializer類。
第二步,注冊你使用的需要通過Kryo序列化的一些自定義類,SparkConf.registerKryoClasses()。
項目中的使用:
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})