Spark 任務內存不足情況需要根據具體問題進行具體分析。
首先spark統一內存模型將內存分為off-heap和heap兩部分內存,每一個部分都可能發生內存不足的情況。
off-heap內存不足的情況
off-heap內存作用
為了進一步優化內存的使用以及提高 Shuffle 時排序的效率,Spark 引入了堆外(Off-heap)內存,使之可以直接在工作節點的系統內存中開辟空間,存儲經過序列化的二進制數據。
利用JDK Unsafe API(從 Spark 2.0 開始,在管理堆外的存儲內存時不再基于 Tachyon,而是與堆外的執行內存一樣,基于 JDK Unsafe API 實現),Spark 可以直接操作系統堆外內存,減少了不必要的內存開銷,以及頻繁的 GC 掃描和回收,提升了處理性能。堆外內存可以被精確地申請和釋放,而且序列化的數據占用的空間可以被精確計算,所以相比堆內內存來說降低了管理的難度,也降低了誤差。
off-heap參數和動態擴展機制
Spark堆外內存的大小可以由spark.memory.offHeap.size控制,spark off-heap空間只分為 execution 和 storage 1:1兩部分,兩部分可以動態擴展。
off-heap不足
目前常用的off-heap常見是 executor端,map task側 shuffle時候,由于ShuffleExternalSorter占用內存過大,導致內存不足。
此外,在spark native場景中,spark將更多的使用 spark off-heap內存取代heap內存,因此spark off-heap在非shuffle場景下也會占用很多off-heap內存。
如果存在off-heap 內存不足警告,可以酌情添加spark.memory.offHeap.size 或者 降低shuffle 內存緩沖區大小。
heap內存不足的情況
spark將heap內存做如下分割

每一個部分超出使用上線都可能產生內存不足的情況。
Spark整體heap內存是由 spark.executor.memory和spark.driver.memory控制的
Spark將spark.xxxx.memory內存分為 user heap(other)、execution heap、storage heap、system heap(300MB)。
spark.memory.fraction 可以將** **spark.xxxx.memory分割出 heap storage 和 heap execution 兩部分。
Heap storage
Heap storage作用
Spark Heap Storage內存用于存儲spark Rdd cache。
在spark SQL中也可以用于存儲 view、或存儲 cache table的表。
Heap storage不足
Heap storage不足一般發生在executor端。主要是因為 cache RDD 太多了,因為動態占用機制,占用heap execution空間,導致分配到當前executor的heap execution不夠用,從而告警。
我們可以通過spark UI 查看 作業的storage 選項卡,定位告警的executor Id,查看cache的大小,與environment 選項卡中的spark.executor.memory做比對。
這種情況下,我們可以酌情,增加spark.executor.memory,或者uncache 或者 drop view 不需要的RDD 或者cached table。
Heap execution
Heap execution作用
Heap execution 可以用于存儲輸入數據。
Spark是內存計算,需要將父RDD的partition讀入內存中,這個數據將會被blockManager讀入。
在spark shuffle map stage 或者 reduce stage都會發生讀入操作。
Heap execution 可以用于shuffle 緩沖區 和 序列化反序列化。
Spark的一些使用heap緩沖區進行shuffle的場景需要分配heap space。
Heap execution空間不足
一般是讀入的數據太多或者發生了數據傾斜。
spark shuffle map task讀入hdfs文件塊數據。
發生在shuffle map stage,spark需要讀取hdfs 文件塊。
這個文件快太大且不可分割,或者數據壓縮密度太大,會導致task memory 急速膨脹,超過1/n 的 executor heap space 和 executor storage space總和。(此處的n是spark executor的running task數目)。
如果hdfs文件塊可以分割讀入task,嘗試使用spark.files.maxPartitionBytes去分割讀取的hdfs文件 或者 擴大 spark.executor.memory。但是后者不適用于個別hdfs大文件塊情況。
如果hdfs 文件塊是gzip這種的不可壓縮,可能只能擴大spark.files.maxPartitionBytes,或者 在業務上有控制寫入hdfs的文件格式是可分割的。
spark shuffle reduce task讀入shuffle block數據太多了,也就是發生數據傾斜。
一般shuffle block 都會連續小幅度的拉入 reduce task側,此時如果溢出,spark 會將這些數據spill disk。
如果出現reduce task讀取超大shuffle block 或者 并發讀取太快,會導致數據內存膨脹太快,而直接OOM。
此時spark會有task重啟機制,會過濾并發過多情況。如果是讀入shuffle block太大,則會反復的發生task failed情況。
此時可以適當降低map側 shuffle write的shuffle block大小,可以嘗試增加 shuffle reduce的并行度 或者 盡量使用sortBaseShuffleWriter,使用sortBaseShuffleWriter 可以實現map combiner。
如果發生了數據傾斜
此時傾斜executor不會出現Exception,但是內存heap使用量偏高,會發出內存告警。
這一點可以在web UI的task列表中清晰的看到stage的某一個task相對于其他task,其computing time明顯很長。
這種情況下一般是shuffle發生了數據傾斜,首先可以使用map combiner,降低map側的傾斜的shuffle reduce分區的數據量。
此外,可以通過編程,提取出topk的shuffle reduce 分區的key值,然后將對應shuffle Key的rdd partition打上隨機數,做一個均勻分割,然后執行reduceByKey操作,最后去掉隨機數,再執行一次reduceByKey。
如果不想這樣,可以使用hive。
Heap User
Heap User作用
用戶自定義數據結構,這個就是用戶使用spark core api 或者 spark sql api過程中使用了其他數據結構進行編程,因而產生了一些heap消耗。
Heap User不足
存在heap不足的情況,一般是用戶將task的計算結果,RDD.collect 到driver中的數據結構中,這種情況,需要去使用專門的connector,實現分布式計算和讀寫。
還有就是用戶執行sql的時候,會進行非insert 操作,即直接select讀取大表,導致driver內存不足夠,這種情況下建議使用limit 操作,或者 使用spark的流式返回功能。