Spark如何將數據寫入到DLI表中
使用Spark將數據寫入到DLI表中,主要設置如下參數:
- fs.obs.access.key
- fs.obs.secret.key
- fs.obs.impl
- fs.obs.endpoint
示例如下:
import logging
from operator import add
from pyspark import SparkContext
logging.basicConfig(format='%(message)s', level=logging.INFO)
#import local file
test_file_name = "D://test-data_1.txt"
out_file_name = "D://test-data_result_1"
sc = SparkContext("local","wordcount app")
sc._jsc.hadoopConfiguration().set("fs.obs.access.key", "myak")
sc._jsc.hadoopConfiguration().set("fs.obs.secret.key", "mysk")
sc._jsc.hadoopConfiguration().set("fs.obs.impl", "org.apache.hadoop.fs.obs.OBSFileSystem")
sc._jsc.hadoopConfiguration().set("fs.obs.endpoint", "myendpoint")
# red: text_file rdd object
text_file = sc.textFile(test_file_name)
# counts
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
# write
counts.saveAsTextFile(out_file_name)
通用隊列操作OBS表如何設置AK/SK
- 獲取結果為AK和SK時,設置如下:
- 代碼創建SparkContext
val sc: SparkContext = new SparkContext() sc.hadoopConfiguration.set("fs.obs.access.key", ak) sc.hadoopConfiguration.set("fs.obs.secret.key", sk) - 代碼創建SparkSession
val sparkSession: SparkSession = SparkSession .builder() .config("spark.hadoop.fs.obs.access.key", ak) .config("spark.hadoop.fs.obs.secret.key", sk) .enableHiveSupport() .getOrCreate()
- 獲取結果為ak、sk和securitytoken時,鑒權時,臨時AK/SK和securitytoken必須同時使用,設置如下:
- 代碼創建SparkContext
val sc: SparkContext = new SparkContext() sc.hadoopConfiguration.set("fs.obs.access.key", ak) sc.hadoopConfiguration.set("fs.obs.secret.key", sk) sc.hadoopConfiguration.set("fs.obs.session.token", sts) - 代碼創建SparkSession
val sparkSession: SparkSession = SparkSession .builder() .config("spark.hadoop.fs.obs.access.key", ak) .config("spark.hadoop.fs.obs.secret.key", sk) .config("spark.hadoop.fs.obs.session.token", sts) .enableHiveSupport() .getOrCreate()
說明出于安全考慮,不建議在obs路徑上帶AK/SK信息。而且,如果是在OBS目錄上建表,建表語句path字段給定的obs路徑不能包含AK/SK信息。
如何查看DLI Spark作業的實際資源使用情況
查看Spark作業原始資源配置
登錄DLI 控制臺,單擊左側“作業管理”>“Spark作業”,在作業列表中找到需要查看的Spark作業,單擊“作業ID”前的,即可查看對應Spark作業的原始資源配置參數。
說明在創建Spark作業時,配置了“高級配置”中的參數,此處才會顯示對應的內容。
查看Spark作業實時運行資源
查看Spark作業實時運行資源,即查看有多少CU正在運行。
-
登錄DLI 控制臺,單擊左側“作業管理”>“Spark作業”,在作業列表中找到需要查看的Spark作業,單擊“操作”列中的“SparkUI”。
-
在SparkUI頁面可查看Spark作業實時運行資源。

-
在SparkUI頁面還可以查看Spark作業原始資源配置(只對新集群開放)。
在SparkUI頁面,單擊“Environment”,可以查看Driver信息和Executor信息。
詳見下圖:Driver信息

詳見下圖:Executor信息

將Spark作業結果存儲在MySQL數據庫中,缺少pymysql模塊,如何使用python腳本訪問MySQL數據庫?
1.缺少pymysql模塊,可以查看是否有對應的egg包,如果沒有,在“程序包管理”頁面上傳pyFile。具體步驟參考如下:
a.將egg包上傳到指定的OBS桶路徑下。
b.登錄DLI管理控制臺,單擊“數據管理 > 程序包管理”。
c.在“程序包管理”頁面,單擊右上角“創建”可創建程序包。
d.在“創建程序包”對話框,配置如下參數:
- 包類型:PyFile。
- OBS路徑:選擇1.aegg包所在的OBS路徑。
- 分組設置和分組名稱根據情況選擇。
e.單擊“確定”完成程序包上傳。
f.在報錯的Spark作業編輯頁面,“依賴python文件”處選擇已上傳的egg程序包,重新運行Spark作業。
2.pyspark作業對接MySQL,需要創建跨源鏈接,打通DLI和RDS之間的網絡。
如何在DLI中運行復雜PySpark程序?
數據湖探索(DLI)服務對于PySpark是原生支持的。
對于數據分析來說Python是很自然的選擇,而在大數據分析中PySpark無疑是不二選擇。對于JVM語言系的程序,通常會把程序打成Jar包并依賴其他一些第三方的Jar,同樣的Python程序也有依賴一些第三方庫,尤其是基于PySpark的融合機器學習相關的大數據分析程序。傳統上,通常是直接基于pip把Python庫安裝到執行機器上,對于DLI這樣的Serverless化服務用戶無需也感知不到底層的計算資源,那如何來保證用戶可以完美運行他的程序呢?
DLI服務在其計算資源中已經內置了一些常用的機器學習的算法庫,這些常用算法庫滿足了大部分用戶的使用場景。對于用戶的PySpark程序依賴了內置算法庫未提供的程序庫該如何呢?其實PySpark本身就已經考慮到這一點了,那就是基于PyFiles來指定依賴,在DLI Spark作業頁面中可以直接選取存放在OBS上的Python第三方程序庫(支持zip、egg等)。
對于依賴的這個Python第三方庫的壓縮包有一定的結構要求,例如,PySpark程序依賴了模塊moduleA(import moduleA),那么其壓縮包要求滿足如下結構:
壓縮包結構要求

即在壓縮包內有一層以模塊名命名的文件夾,然后才是對應類的Python文件,通常下載下來的Python庫可能不滿足這個要求,因此需要重新壓縮。同時對壓縮包的名稱沒有要求,所以建議可以把多個模塊的包都壓縮到一個壓縮包里。至此,已經可以完整的運行起來一個大型、復雜的PySpark程序了。
Spark作業訪問MySQL數據庫的方案
通過DLI Spark作業訪問MySQL數據庫中的數據有如下兩種方案:
- 方案1:在DLI中購買按需專屬隊列,創建增強型跨源連接,再通過跨源表讀取MySQL數據庫中的數據,該方案需要用戶自行編寫java代碼或scala代碼。
- 方案2:先使用云數據遷移服務CDM將MySQL數據庫中的數據導入OBS桶中,再通過Spark作業讀取OBS桶中的數據,如果用戶已有CDM集群,該方案比方案1簡單,且不會對現有數據庫造成壓力。
如何通過JDBC設置spark.sql.shuffle.partitions參數提高并行度
操作場景
Spark作業在執行shuffle類語句,包括group by、join等場景時,常常會出現數據傾斜的問題,導致作業任務執行緩慢。
該問題可以通過設置spark.sql.shuffle.partitions提高shuffle read task的并行度來進行解決。
設置spark.sql.shuffle.partitions參數提高并行度
用戶可在JDBC中通過set方式設置dli.sql.shuffle.partitions參數。具體方法如下:
Statement st = conn.stamte()
st.execute("set spark.sql.shuffle.partitions=20")
Spark jar 如何讀取上傳文件
Spark可以使用SparkFiles讀取 –-file中提交上來的文件的本地路徑,即:SparkFiles.get("上傳的文件名")。
說明lDriver中的文件路徑與Executor中獲取的路徑位置是不一致的,所以不能將Driver中獲取到的路徑作為參數傳給Executor去執行。
lExecutor獲取文件路徑的時候,仍然需要使用SparkFiles.get(“filename”)的方式獲取。
lSparkFiles.get()方法需要spark初始化以后才能調用。
代碼段如下所示
package main.java
import org.apache.spark.SparkFiles
import org.apache.spark.sql.SparkSession
import scala.io.Source
object DliTest {
def main(args:Array[String]): Unit = {
val spark = SparkSession.builder
.appName("SparkTest")
.getOrCreate()
// driver 獲取上傳文件
println(SparkFiles.get("test"))
spark.sparkContext.parallelize(Array(1,2,3,4))
// Executor 獲取上傳文件
.map(_ => println(SparkFiles.get("test")))
.map(_ => println(Source.fromFile(SparkFiles.get("test")).mkString)).collect()
}
}