一、簡介
Elastic-Job是一個分布式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。
Elastic-Job整合了Quartz和Zookeeper,封裝而成的一個分布式調度框架
Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分布式任務調度
ElasticJob Cloud 采用自(zi)研(yan) Mesos Framework 的(de)解決(jue)方案(an),額外提(ti)供(gong)(gong)資(zi)源(yuan)治理、應用分(fen)發以及(ji)進程隔(ge)離等功能。它通(tong)過彈性調度、資(zi)源(yuan)管控、以及(ji)作(zuo)業治理的(de)功能,打造(zao)一個適用于(yu)互聯網場景的(de)分(fen)布(bu)式調度解決(jue)方案(an),并通(tong)過開(kai)放的(de)架構設(she)計(ji),提(ti)供(gong)(gong)多(duo)元化(hua)的(de)作(zuo)業生(sheng)態。
二、功能列表
1、任務分片
- 將整體任務拆解為多個子任務
- 可通過服務器的增減彈性伸縮任務處理能力
- 分布式協調,任務服務器上下線的全自動發現與處理
Elastic Job中任務分片項的概念,使得任務可以在分布式的環境下運行,每條任務服務器只運行分配該服務器的分片。隨著服務器增加或宕機,Elastic Job近乎實時的感知服務器數量的變更,從而重新分配分片。
例如、新(xin)增服務(wu)(wu)器(qi)時(shi),通過注(zhu)冊(ce)中(zhong)心(xin)(xin)臨時(shi)節點的變化感(gan)知新(xin)服務(wu)(wu)器(qi),并在下次(ci)任務(wu)(wu)調(diao)度的時(shi)候重(zhong)新(xin)分片,新(xin)服務(wu)(wu)將(jiang)(jiang)承載(zai)一(yi)部分作(zuo)業。服務(wu)(wu)器(qi)宕機時(shi),注(zhu)冊(ce)中(zhong)心(xin)(xin)感(gan)知,并將(jiang)(jiang)在下次(ci)運行(xing)(xing)將(jiang)(jiang)分片轉移到存活的服務(wu)(wu)器(qi),達到高可用(yong)。本次(ci)宕機未執(zhi)行(xing)(xing)的作(zuo)業,可以通過失效轉移的方式繼續執(zhi)行(xing)(xing)。
2、多任務類型
- 基于時間驅動的任務
- 基于數據驅動的任務
- 同事支持常駐任務和瞬時任務
- 多語言任務支持
3、云原生
- 完美結合Mesos或Kubernetes等調度平臺
- 任務不依賴IP、磁盤、數據等有狀態組件
- 合理的資源調度、基于Netflix的Fenzo進行資源的分配
4、容錯性
- 支持定時自我故障檢測與自動修復
- 分布式任務分片唯一保證
- 支持失效轉移和錯過任務重觸發
彈(dan)性擴容縮容在下(xia)次(ci)作業(ye)運行(xing)(xing)前(qian)重分(fen)(fen)片,但本次(ci)作業(ye)執行(xing)(xing)的(de)(de)過(guo)程中,下(xia)線(xian)的(de)(de)服(fu)務器(qi)所分(fen)(fen)配的(de)(de)作業(ye)將不(bu)會重新被分(fen)(fen)配。失(shi)效轉移(yi)功(gong)能可以在本次(ci)作業(ye)運行(xing)(xing)中用空閑服(fu)務器(qi)抓取孤立作業(ye)分(fen)(fen)片執行(xing)(xing)。同樣(yang)失(shi)效轉移(yi)功(gong)能也會犧(xi)牲部分(fen)(fen)性能。
5、任務聚合
- 相同任務聚合至相同的執行器統一處理
- 節省系統資源和初始化開銷
- 動態調配追加資源至新分配的任務
易用性
- 完善的運維平臺
- 提供任務執行歷史數據追蹤能力
- 注冊中心數據一鍵dump用于備份與調試問題
三、快速集成
1、引入maven依賴
版(ban)本(ben)使用2.1.5,官方有一段時(shi)間沒更新elasticJob了,近期重新啟動,即將發(fa)布3.0.0版(ban)本(ben)
<properties>
<lasted.release.version>2.1.5</lasted.release.version>
</properties>
<!-- import elastic-job lite core -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${lasted.release.version}</version>
</dependency>
<!-- import other module if need -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${lasted.release.version}</version>
</dependency>
2、配置ZooKeeper
@Configuration
public class JobRegistryCenterConfig {
@Bean
public ZookeeperRegistryCenter registryCenter(){
ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("x.x.x.x:2181","elastic-job-ns");
zookeeperConfiguration.setMaxRetries(5);
ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
zookeeperRegistryCenter.init();
return zookeeperRegistryCenter;
}
}
3、配置Job調度器
@Configuration
public class MyJobConfig {
private final String cron = "1/5 * * * * ?";
private final int shardingTotalCount = 5;
private final String shardingItemParameters = "0=A,1=B,2=C";
private final String jobParameters = "parameter";
@Resource
private ZookeeperRegistryCenter regCenter;
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(final MySimpleJob simpleJob) {
return new SpringJobScheduler(simpleJob, regCenter,
getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount, shardingItemParameters));
}
/**
* 配置任務詳細信息
* @param jobClass
* @param cron
* @param shardingTotalCount
* @param shardingItemParameters
* @return
*/
private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
final String cron,
final int shardingTotalCount,
final String shardingItemParameters) {
return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(
JobCoreConfiguration.newBuilder(jobClass.getName(), cron, shardingTotalCount)
.shardingItemParameters(shardingItemParameters).build()
, jobClass.getCanonicalName())
).overwrite(true).build();
}
}
4、編寫任務邏輯
@Component
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
String a = String.format("Thread ID: %s, 作業分片總數: %s, " +
"當前分片項: %s.當前參數: %s," +
"作業名稱: %s.作業自定義參數: %s"
,
Thread.currentThread().getId(),
shardingContext.getShardingTotalCount(),
shardingContext.getShardingItem(),
shardingContext.getShardingParameter(),
shardingContext.getJobName(),
shardingContext.getJobParameter()
);
System.out.println(a);
}
}
5、運行效果
Thread ID: 75, 作業分片總數: 5, 當前分片項: 0.當前參數: A,作業名稱: cn.draven.elastic.job.job.MySimpleJob.作業自定義參數:
Thread ID: 76, 作業分片總數: 5, 當前分片項: 1.當前參數: B,作業名稱: cn.draven.elastic.job.job.MySimpleJob.作業自定義參數:
Thread ID: 77, 作業分片總數: 5, 當前分片項: 2.當前參數: C,作業名稱: cn.draven.elastic.job.job.MySimpleJob.作業自定義參數:
Thread ID: 77, 作業分片總數: 5, 當前分片項: 3.當前參數: D,作業名稱: cn.draven.elastic.job.job.MySimpleJob.作業自定義參數:
Thread ID: 77, 作業分片總數: 5, 當前分片項: 4.當前參數: E,作業名稱: cn.draven.elastic.job.job.MySimpleJob.作業自定義參數:
Thread ID: 48, 作業分片總數: 5, 當前分片項: 0.當前參數: A,作業名稱: cn.draven.elastic.job.job.MySimpleJob.作業自定義參數:
Thread ID: 49, 作業分片總數: 5, 當前分片項: 1.當前參數: B,作業名稱: cn.draven.elastic.job.job.MySimpleJob.作業自定義參數:
Thread ID: 48, 作業分片總數: 5, 當前分片項: 2.當前參數: C,作業名稱: cn.draven.elastic.job.job.MySimpleJob.作業自定義參數:
Thread ID: 49, 作業分片總數: 5, 當前分片項: 3.當前參數: D,作業名稱: cn.draven.elastic.job.job.MySimpleJob.作業自定義參數:
Thread ID: 48, 作業分片總數: 5, 當前分片項: 4.當前參數: E,作業名稱: cn.draven.elastic.job.job.MySimpleJob.作業自定義參數:
Thread ID: 77, 作業分片總數: 5, 當前分片項: 0.當前參數: A,作業名稱: cn.draven.elastic.job.job.MySimpleJob.作業自定義參數:
Thread ID: 77, 作業分片總數: 5, 當前分片項: 1.當前參數: B,作業名稱: cn.draven.elastic.job.job.MySimpleJob.作業自定義參數: