操作場景
Flume支持(chi)將(jiang)采(cai)集的日志信(xin)息導(dao)入(ru)到Kafka。
前提條件
- 已創建啟用Kerberos認證的流集群。
- 已在日志生成節點安裝Flume客戶端,例如安裝目錄為“/opt/Flumeclient”,客戶端安裝請參見“組件操作指南 > 使用Flume > 安裝Flume客戶端”。以下操作的客戶端目錄只是舉例,請根據實際安裝目錄修改。
- 已配置網絡,使日志生成節點與流集群互通。
使用Flume客戶端(MRS 3.x之前版本)

說明普通(tong)集群(qun)不需要執行步驟(zou)1-步驟(zou)5。
- 將Master1節點上的認證服務器配置文件,復制到安裝Flume客戶端的節點,保存到Flume客戶端中“Flume客戶端安裝目錄/fusioninsight-flume-Flume 組件版本號 /conf”目錄下。
文件完整路(lu)徑為${BIGDATA_HOME}/MRS_Current/1_ X _KerberosClient/etc/kdc.conf。
其中(zhong)“X”為(wei)隨(sui)機(ji)生(sheng)成的數字(zi),請根(gen)據實際情況修改。同時文件(jian)需要以Flume客戶(hu)端(duan)安(an)裝用戶(hu)身(shen)份保存,例如(ru)root用戶(hu)。
- 查看任一部署Flume角色節點的“業務IP”。
登錄(lu)集(ji)群(qun)詳(xiang)情(qing)頁面,選擇“集(ji)群(qun)>組(zu)件管(guan)理 >Flume > 實(shi)例(li)”,查看任一部署(shu)Flume角(jiao)色節點的“業務IP”。

說明若集(ji)群(qun)詳情(qing)頁面沒有“組件管理(li)”頁簽,請先完(wan)成IAM用戶同步(在集(ji)群(qun)詳情(qing)頁的“概覽”頁簽,單擊(ji)“IAM用戶同步”右側(ce)的“同步”進行IAM用戶同步)。
- 將此節點上的用戶認證文件,復制到安裝Flume客戶端的節點,保存到Flume客戶端中“Flume客戶端安裝目錄/fusioninsight-flume-Flume 組件版本號 /conf”目錄下。
文(wen)件完整路(lu)徑為${BIGDATA_HOME}/MRS_ XXX /install/FusionInsight-Flume-Flume 組件版(ban)本(ben)號(hao) /flume/conf/flume.keytab。
其(qi)中“XXX”為產品版本號,請根據實際情(qing)況修(xiu)改。同(tong)時文件需要(yao)以(yi)Flume客戶端安裝用戶身份保存(cun),例如root用戶。
- 將此節點上的配置文件“jaas.conf”,復制到安裝Flume客戶端的節點,保存到Flume客戶端中“conf”目錄。
文(wen)件完整路徑為${BIGDATA_HOME}/MRS_Current/1_ X _Flume/etc/jaas.conf。
其中“X”為隨機生成的數字,請根據實際情況修改。同(tong)時文件(jian)需要以Flume客戶端安裝用戶身份(fen)保存(cun),例如(ru)root用戶。
- 登錄安裝Flume客戶端節點,切換到客戶端安裝目錄,執行以下命令修改文件:
vi conf/jaas.conf
修改參數(shu)“keyTab”定義(yi)的用(yong)戶認(ren)(ren)證文(wen)件(jian)完(wan)整路(lu)徑即步驟3中保存(cun)用(yong)戶認(ren)(ren)證文(wen)件(jian)的目(mu)錄(lu):“Flume客戶端安裝目(mu)錄(lu)/fusioninsight-flume-Flume 組(zu)件(jian)版本號 /conf”,然后保存(cun)并退出。
- 執行以下命令,修改Flume客戶端配置文件“flume-env.sh”:
vi Flume 客戶端安裝目錄 / fusioninsight-flume- Flume 組件版本號 /conf/flume-env.sh
在(zai)“-XX:+UseCMSCompactAtFullCollection”后面,增加以下內容:
-Djava.security.krb5.conf= Flume 客戶端安裝目錄 /fusioninsight-flume-1.9.0/conf/kdc.conf
-Djava.security.auth.login.config=Flume 客戶端安裝目錄 /fusioninsight-flume-1.9.0/conf/jaas.conf
-Dzookeeper.request.timeout=120000
例如: "-XX:+UseCMSCompactAtFullCollection
-Djava.security.krb5.conf=/opt/FlumeClient / fusioninsight-flume- Flume 組件版本號 /conf/kdc.conf
-Djava.security.auth.login.config=/opt/FlumeClient / fusioninsight-flume- Flume 組件版本號 /conf/jaas.conf
-Dzookeeper.request.timeout=120000"
請(qing)根據實(shi)際情況(kuang),修改“Flume客戶端安裝目錄”,然后保存(cun)并(bing)退(tui)出。
- 執行以下命令,重啟Flume客戶端:
cd Flume 客戶端安裝目錄 /fusioninsight-flume- Flume 組件版本號 /bin
./flume-manage.shrestart
例如:
cd /opt/FlumeClient/fusioninsight-flume- Flume 組件版本號 /bin./flume-manage.shrestart
- 執行以下命令,根據實際業務需求,在Flume客戶端配置文件“properties.properties”中配置并保存作業。
vi Flume 客戶端安裝目錄 /fusioninsight-flume- Flume 組件版本號 /conf/properties.properties
以配置SpoolDir Source+File Channel+Kafka Sink為例:
#########################################################################################
client.sources = static_log_source
client.channels = static_log_channel
client.sinks = kafka_sink
#########################################################################################
#LOG_TO_HDFS_ONLINE_1
client.sources.static_log_source.type = spooldir
client.sources.static_log_source.spoolDir = 監控目錄
client.sources.static_log_source.fileSuffix = .COMPLETED
client.sources.static_log_source.ignorePattern = ^$
client.sources.static_log_source.trackerDir = 傳輸過程中元數據存儲路徑
client.sources.static_log_source.maxBlobLength = 16384
client.sources.static_log_source.batchSize = 51200
client.sources.static_log_source.inputCharset = UTF-8
client.sources.static_log_source.deserializer = LINE
client.sources.static_log_source.selector.type = replicating
client.sources.static_log_source.fileHeaderKey = file
client.sources.static_log_source.fileHeader = false
client.sources.static_log_source.basenameHeader = true
client.sources.static_log_source.basenameHeaderKey = basename
client.sources.static_log_source.deletePolicy = never
client.channels.static_log_channel.type = file
client.channels.static_log_channel.dataDirs = 數據緩存路徑,設置多個路徑可提升性能,中間用逗號分開
client.channels.static_log_channel.checkpointDir = 檢查點存放路徑
client.channels.static_log_channel.maxFileSize = 2146435071
client.channels.static_log_channel.capacity = 1000000
client.channels.static_log_channel.transactionCapacity = 612000
client.channels.static_log_channel.minimumRequiredSpace = 524288000
client.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
client.sinks.kafka_sink.kafka.topic = 數據寫入的topic ,如flume_test
client.sinks.kafka_sink.kafka.bootstrap.servers = XXX . XXX . XXX . XXX :kafka 端口號 , XXX . XXX . XXX . XXX :kafka 端口號 , XXX . XXX . XXX . XXX :kafka端口號
client.sinks.kafka_sink.flumeBatchSize = 1000
client.sinks.kafka_sink.kafka.producer.type = sync
client.sinks.kafka_sink.kafka.security.protocol = SASL_PLAINTEXT
client.sinks.kafka_sink.kafka.kerberos.domain.name = Kafka Domain名稱,安全集群必填,如hadoop.xxx.1com
client.sinks.kafka_sink.requiredAcks = 0
client.sources.static_log_source.channels = static_log_channel
client.sinks.kafka_sink.channel = static_log_channel
說明
client.sinks.kafka_sink.kafka.topic:數據寫(xie)入的topic。若(ruo)kafka中該topic不存在(zai),默認情(qing)況下會自動創建(jian)該topic。
client.sinks.kafka_sink.kafka.bootstrap.servers:Kafkabrokers列表,多個用英(ying)文(wen)逗號分隔(ge)。默認情況(kuang)下,安全集(ji)群端口21007,普通(tong)集(ji)群對應端口9092。
client.sinks.kafka_sink.kafka.security.protocol:安全集群(qun)為SASL_PLAINTEXT,普(pu)通集群(qun)為PLAINTEXT。
client.sinks.kafka_sink.kafka.kerberos.domain.name:
普通集(ji)群無(wu)需配置此參數(shu)。安全集(ji)群對(dui)(dui)應此參數(shu)的(de)值為Kafka集(ji)群中“kerberos.domain.name”對(dui)(dui)應的(de)值。
具體可到(dao)Broker實例所在(zai)節點上查看${BIGDATA_HOME}/MRS_Current/1_ X _Broker/etc/server.properties。
其中(zhong)X為(wei)隨(sui)機生(sheng)成的數字,請根(gen)據實際(ji)情況修改。同時文件需要以Flume客(ke)戶端安裝(zhuang)用戶身份保存,例如(ru)root用戶。
具體可(ke)到Broker實例所(suo)在節點(dian)上查看“${BIGDATA_HOME}/FusionInsight_Current/1_X_Broker/etc/server.properties”。
- 參數配置并保存后,Flume客戶端將自動加載“properties.properties”中配置的內容。當spoolDir生成新的日志文件,文件內容將發送到Kafka生產者,并支持Kafka消費者消費。
使用Flume客戶端(MRS 3.x及之后版本)
說明普(pu)通集群不需(xu)要(yao)執行(xing)1-5。
- 將Master1節點上的認證服務器配置文件,復制到安裝Flume客戶端的節點,保存到Flume客戶端中Flume 客戶端安裝目錄 /fusioninsight-flume-Flume 組件版本號 /conf目錄下。
文件完整路徑為“${BIGDATA_HOME}/FusionInsight_BASE_ XXX /1_ X _KerberosClient/etc/kdc.conf”。其中“XXX”為產品版本號,“X”為隨機生成的數字,請根據實際情況修改。同時文件需要以Flume客戶端安裝用戶身份保存,例如root用戶。
- 查看任一部署Flume角色節點的“業務IP”。
登錄FusionInsight Manager頁面,具體請參見訪問FusionInsight Manager(MRS 3.x及之后版本),選擇“集群(qun) > 服務 > Flume > 實例”。查看任一部(bu)署(shu)Flume角(jiao)色(se)節點的“業(ye)務IP”。

說明若集(ji)群(qun)詳情(qing)頁(ye)面沒有(you)“組件管理”頁(ye)簽,請先完成(cheng)IAM用戶(hu)同步(在集(ji)群(qun)詳情(qing)頁(ye)的(de)“概覽”頁(ye)簽,單擊“IAM用戶(hu)同步”右側(ce)的(de)“同步”進行IAM用戶(hu)同步)。
- 將此節點上的用戶認證文件,復制到安裝Flume客戶端的節點,保存到Flume客戶端中“Flume客戶端安裝目錄/fusioninsight-flume-Flume 組件版本號 /conf”目錄下。
文件完(wan)整路徑為${BIGDATA_HOME}/FusionInsight_Porter_ XXX /install/FusionInsight-Flume-Flume 組件版本號 /flume/conf/flume.keytab。
其中“XXX”為產品版本號,請根據(ju)實際情(qing)況修改。同時文件(jian)需要以Flume客戶端安裝用戶身(shen)份保存,例如(ru)root用戶。
- 將此節點上的配置文件“jaas.conf”,復制到安裝Flume客戶端的節點,保存到Flume客戶端中“conf”目錄。
文件(jian)完整路徑為${BIGDATA_HOME}/FusionInsight_Current/1_ X _Flume/etc/jaas.conf。
其中“X”為隨機生成的數字,請根(gen)據實(shi)際(ji)情況修(xiu)改(gai)。同(tong)時文件需要以Flume客戶端安裝用戶身(shen)份保存,例如(ru)root用戶。
- 登錄安裝Flume客戶端節點,切換到客戶端安裝目錄,執行以下命令修改文件:
vi conf/jaas.conf
修改(gai)參數(shu)“keyTab”定義的用戶認(ren)證文(wen)件(jian)完整路徑(jing)即步(bu)驟3中(zhong)保存(cun)用戶認(ren)證文(wen)件(jian)的目(mu)錄:“Flume客(ke)戶端(duan)安裝目(mu)錄/fusioninsight-flume-Flume 組件(jian)版本號(hao) /conf”,然(ran)后(hou)保存(cun)并退出。
- 執行以下命令,修改Flume客戶端配置文件“flume-env.sh”:
vi Flume 客戶端安裝目錄 / fusioninsight-flume- Flume 組件版本號 /conf/flume-env.sh
在“-XX:+UseCMSCompactAtFullCollection”后面,增(zeng)加(jia)以下內容:
-Djava.security.krb5.conf=Flume 客戶端安裝目錄 /fusioninsight-flume-1.9.0/conf/kdc.conf
-Djava.security.auth.login.config=Flume 客戶端安裝目錄 /fusioninsight-flume-1.9.0/conf/jaas.conf -Dzookeeper.request.timeout=120000
例如: "-XX:+UseCMSCompactAtFullCollection
-Djava.security.krb5.conf=/opt/FlumeClient / fusioninsight-flume- Flume 組件版本號 /conf/kdc.conf -Djava.security.auth.login.config=/opt/FlumeClient / fusioninsight-flume- Flume 組件版本號 /conf/jaas.conf
-Dzookeeper.request.timeout=120000"
請根據實際(ji)情況,修改“Flume客戶端安裝目錄”,然后保存并(bing)退(tui)出。
- 執行以下命令,重啟Flume客戶端:
cd Flume 客戶端安裝目錄 /fusioninsight-flume- Flume 組件版本號 /bin
./flume-manage.shrestart
例如:
cd /opt/FlumeClient/fusioninsight-flume- Flume 組件版本號 /bin
./flume-manage.sh restart
- 根據實際業務場景配置作業。
- MRS 3.x及之后版本部分參數可直接在Manager界面配置。
- 在“properties.properties”文件中配置,以配置SpoolDir Source+File Channel+Kafka Sink為例。
在(zai)安裝Flume客戶(hu)端(duan)的節點執行以下命令,根據實(shi)際業務(wu)需求,在(zai)Flume客戶(hu)端(duan)配(pei)置文件“properties.properties”中配(pei)置并保存作業。
vi Flume 客戶端安裝目錄 /fusioninsight-flume- Flume 組件版本號 /conf/properties.properties
#########################################################################################
client.sources = static_log_source
client.channels = static_log_channel
client.sinks = kafka_sink
#########################################################################################
#LOG_TO_HDFS_ONLINE_1
client.sources.static_log_source.type = spooldir
client.sources.static_log_source.spoolDir = 監控目錄
client.sources.static_log_source.fileSuffix = .COMPLETED
client.sources.static_log_source.ignorePattern = ^$
client.sources.static_log_source.trackerDir = 傳輸過程中元數據存儲路徑
client.sources.static_log_source.maxBlobLength = 16384
client.sources.static_log_source.batchSize = 51200
client.sources.static_log_source.inputCharset = UTF-8
client.sources.static_log_source.deserializer = LINE
client.sources.static_log_source.selector.type = replicating
client.sources.static_log_source.fileHeaderKey = file
client.sources.static_log_source.fileHeader = false
client.sources.static_log_source.basenameHeader = true
client.sources.static_log_source.basenameHeaderKey = basename
client.sources.static_log_source.deletePolicy = never
client.channels.static_log_channel.type = file
client.channels.static_log_channel.dataDirs = 數據緩存路徑,設置多個路徑可提升性能,中間用逗號分開
client.channels.static_log_channel.checkpointDir = 檢查點存放路徑
client.channels.static_log_channel.maxFileSize = 2146435071
client.channels.static_log_channel.capacity = 1000000
client.channels.static_log_channel.transactionCapacity = 612000
client.channels.static_log_channel.minimumRequiredSpace = 524288000
client.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
client.sinks.kafka_sink.kafka.topic = 數據寫入的topic ,如flume_test
client.sinks.kafka_sink.kafka.bootstrap.servers = XXX . XXX . XXX . XXX :kafka 端口號 , XXX . XXX . XXX . XXX :kafka 端口號 , XXX . XXX . XXX . XXX :kafka端口號
client.sinks.kafka_sink.flumeBatchSize = 1000
client.sinks.kafka_sink.kafka.producer.type = sync
client.sinks.kafka_sink.kafka.security.protocol = SASL_PLAINTEXT
client.sinks.kafka_sink.kafka.kerberos.domain.name = Kafka Domain名稱,安全集群必填,如hadoop.xxx.1com
client.sinks.kafka_sink.requiredAcks = 0
client.sources.static_log_source.channels = static_log_channel
client.sinks.kafka_sink.channel = static_log_channel

說明
client.sinks.kafka_sink.kafka.topic:數據寫入的topic。若(ruo)kafka中該topic不存在,默認情況下會自(zi)動創(chuang)建該topic。
client.sinks.kafka_sink.kafka.bootstrap.servers:Kafkabrokers列表(biao),多個用英文逗號分隔。默認情況下,安(an)全(quan)集群(qun)端(duan)(duan)口21007,普(pu)通(tong)集群(qun)對應端(duan)(duan)口9092。
client.sinks.kafka_sink.kafka.security.protocol:安全集(ji)群(qun)為(wei)SASL_PLAINTEXT,普通(tong)集(ji)群(qun)為(wei)PLAINTEXT。
client.sinks.kafka_sink.kafka.kerberos.domain.name:
普通(tong)集(ji)(ji)群(qun)(qun)無需配(pei)置此參數。安全集(ji)(ji)群(qun)(qun)對(dui)應此參數的值為Kafka集(ji)(ji)群(qun)(qun)中“kerberos.domain.name”對(dui)應的值。
具體可到Broker實例(li)所在節點上查看${BIGDATA_HOME}/MRS_Current/1_ X _Broker/etc/server.properties。
其中X為(wei)隨(sui)機生成的數字,請根據實際情況修(xiu)改(gai)。同時文件需要以(yi)Flume客戶(hu)(hu)端安裝用戶(hu)(hu)身份保存(cun),例如root用戶(hu)(hu)。
具體可到(dao)Broker實例所在節點上查(cha)看“${BIGDATA_HOME}/FusionInsight_Current/1_X_Broker/etc/server.properties”。
- 參數配置并保存后,Flume客戶端將自動加載“properties.properties”中配置的內容。當spoolDir生成新的日志文件,文件內容將發送到Kafka生產者,并支持Kafka消費者消費。