[toc]
目录
pom.xml
<!-- https://search.maven.org/artifact/com.dangdang/elastic-job-lite-core/2.1.5/jar -->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<!-- lombok 工具通过在代码编译时期动态的将注解替换为具体的代码, IDEA 需要添加 lombok 插件 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
</dependency>
实现 SimpleJob 接口
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String date = sdf.format(new Date());
System.out.println("时间:" + date
+ " 分片项:" + shardingContext.getShardingItem()
+ " 分片总数:" + shardingContext.getShardingTotalCount());
}
}
Main
public class App {
public static void main(String[] args) {
System.out.println("Hello World!");
new JobScheduler(zkCenter(),configuration()).init();
}
/**
* Zookeeper注册中心
* @return
*/
public static CoordinatorRegistryCenter zkCenter() {
// 集群可用逗号分割
ZookeeperConfiguration zc = new ZookeeperConfiguration(
"xx.xx.xx.xx:2181", "java-simple-job");
CoordinatorRegistryCenter crc = new ZookeeperRegistryCenter(zc);
// 注册中心初始化
crc.init();
return crc;
}
/**
* job 配置
* @return
*/
public static LiteJobConfiguration configuration() {
// job 核心配置
JobCoreConfiguration jcc = JobCoreConfiguration
.newBuilder(
"mySimpleJob",
"0/5 * * * * ?",
2)
.build();
// job 类型配置
JobTypeConfiguration jtc = new SimpleJobConfiguration(
jcc,
MySimpleJob.class.getCanonicalName());
// job 根的配置(LiteJobConfiguration)
LiteJobConfiguration ljc = LiteJobConfiguration
.newBuilder(jtc)
// 很重要属性,如果在修改时间之后,因为配置已经上传到zk,没有覆盖重写,就不会变更新的定时时间
.overwrite(true)
.build();
return ljc;
}
}
实现 DataflowJob 接口
public class MyDataflow implements DataflowJob<Order> {
List<Order> orders = new ArrayList<>();
public static final int NUMBER = 100;
/**
* 模拟 100 订单
*/
{
for (int i = 0; i < NUMBER; i++) {
Order order = new Order();
order.setOrderId(i+1);
// 未处理
order.setStatus(0);
orders.add(order);
}
}
/**
* 数据的抓取
* @param shardingContext
* @return
*/
@Override
public List<Order> fetchData(ShardingContext shardingContext) {
// 订单号 % 分片总数 == 当前分片项
List<Order> orderList = orders.stream().filter(o -> o.getStatus() == 0)
.filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem())
.collect(Collectors.toList());
List<Order> subList = null;
if (orderList.size() > 0) {
// 截取 0 - 10 条数据
subList = orderList.subList(0, 10);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("时间:" + LocalTime.now() + " 分片项:" + shardingContext.getShardingItem() + " 抓取的数据:" + subList);
return subList;
}
/**
* 数据的处理
* @param shardingContext
* @param list
*/
@Override
public void processData(ShardingContext shardingContext, List<Order> list) {
// 标记已处理的数据
list.forEach(o -> o.setStatus(1));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("时间:" + LocalTime.now() + " 分片项:" + shardingContext.getShardingItem() + " 正在处理数据");
}
}
泛型
public class Order {
private Integer orderId;
/**
* 0未处理,1已处理
*/
private Integer status;
public Integer getOrderId() {
return orderId;
}
public void setOrderId(Integer orderId) {
this.orderId = orderId;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
@Override
public String toString() {
return "Order{" +
"orderId=" + orderId +
", status=" + status +
'}';
}
}
Main
public class App {
public static void main(String[] args) {
System.out.println("Hello World!");
new JobScheduler(zkCenter(),configurationDataflow()).init();
}
/**
* Zookeeper注册中心
* @return
*/
public static CoordinatorRegistryCenter zkCenter() {
// 集群可用逗号分割
ZookeeperConfiguration zc = new ZookeeperConfiguration(
"129.28.198.30:2181", "java-simple-job");
CoordinatorRegistryCenter crc = new ZookeeperRegistryCenter(zc);
// 注册中心初始化
crc.init();
return crc;
}
/**
* job 配置
* @return
*/
public static LiteJobConfiguration configurationDataflow() {
// job 核心配置
JobCoreConfiguration jcc = JobCoreConfiguration
.newBuilder(
"myDataflow",
"0/10 * * * * ?",
2)
.build();
// job 类型配置
JobTypeConfiguration jtc = new DataflowJobConfiguration(
jcc,
MyDataflow.class.getCanonicalName(),
// 如果填写 false 和 SimpleJob 一样 只执行一次
true
);
// job 根的配置(LiteJobConfiguration)
LiteJobConfiguration ljc = LiteJobConfiguration
.newBuilder(jtc)
// 很重要属性,如果在修改时间之后,因为配置已经上传到zk,没有覆盖重写,就不会变更新的定时时间
.overwrite(true)
.build();
return ljc;
}
}