Elastic-Job 简单 Java API 使用 | Eddie'Blog
Elastic-Job 简单 Java API 使用

Elastic-Job 简单 Java API 使用

eddie 297 2022-02-11

[toc]

目录

pom.xml

    <!-- https://search.maven.org/artifact/com.dangdang/elastic-job-lite-core/2.1.5/jar -->
    <dependency>
      <groupId>com.dangdang</groupId>
      <artifactId>elastic-job-lite-core</artifactId>
      <version>2.1.5</version>
    </dependency>

    <!-- lombok 工具通过在代码编译时期动态的将注解替换为具体的代码, IDEA 需要添加 lombok 插件 -->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.16.18</version>
    </dependency>

实现 SimpleJob 接口

public class MySimpleJob implements SimpleJob {

  @Override
  public void execute(ShardingContext shardingContext) {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String date = sdf.format(new Date());
    System.out.println("时间:" + date
        + " 分片项:" + shardingContext.getShardingItem()
        + " 分片总数:" + shardingContext.getShardingTotalCount());
  }

}

Main

public class App {

  public static void main(String[] args) {
    System.out.println("Hello World!");
    new JobScheduler(zkCenter(),configuration()).init();
  }

  /**
   * Zookeeper注册中心
   * @return
   */
  public static CoordinatorRegistryCenter zkCenter() {
    // 集群可用逗号分割
    ZookeeperConfiguration zc = new ZookeeperConfiguration(
        "xx.xx.xx.xx:2181", "java-simple-job");

    CoordinatorRegistryCenter crc = new ZookeeperRegistryCenter(zc);
    // 注册中心初始化
    crc.init();

    return crc;
  }

  /**
   * job 配置
   * @return
   */
  public static LiteJobConfiguration configuration() {
    // job 核心配置
    JobCoreConfiguration jcc = JobCoreConfiguration
        .newBuilder(
            "mySimpleJob",
            "0/5 * * * * ?",
            2)
        .build();

    // job 类型配置
    JobTypeConfiguration jtc = new SimpleJobConfiguration(
        jcc,
        MySimpleJob.class.getCanonicalName());

    // job 根的配置(LiteJobConfiguration)
    LiteJobConfiguration ljc = LiteJobConfiguration
        .newBuilder(jtc)
        // 很重要属性,如果在修改时间之后,因为配置已经上传到zk,没有覆盖重写,就不会变更新的定时时间
        .overwrite(true)
        .build();

    return ljc;
  }

}

实现 DataflowJob 接口

public class MyDataflow implements DataflowJob<Order> {

  List<Order> orders = new ArrayList<>();

  public static final int NUMBER = 100;

  /**
   * 模拟 100 订单
   */
  {
    for (int i = 0; i < NUMBER; i++) {
      Order order = new Order();
      order.setOrderId(i+1);
      // 未处理
      order.setStatus(0);
      orders.add(order);
    }
  }

  /**
   * 数据的抓取
   * @param shardingContext
   * @return
   */
  @Override
  public List<Order> fetchData(ShardingContext shardingContext) {
    // 订单号 % 分片总数 == 当前分片项
    List<Order> orderList = orders.stream().filter(o -> o.getStatus() == 0)
        .filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem())
        .collect(Collectors.toList());

    List<Order> subList = null;
    if (orderList.size() > 0) {
      // 截取 0 - 10 条数据
      subList = orderList.subList(0, 10);
    }

    try {
      Thread.sleep(3000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    System.out.println("时间:" + LocalTime.now() + " 分片项:" + shardingContext.getShardingItem() + " 抓取的数据:" + subList);

    return subList;
  }

  /**
   * 数据的处理
   * @param shardingContext
   * @param list
   */
  @Override
  public void processData(ShardingContext shardingContext, List<Order> list) {

    // 标记已处理的数据
    list.forEach(o -> o.setStatus(1));

    try {
      Thread.sleep(5000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

    System.out.println("时间:" + LocalTime.now() + " 分片项:" + shardingContext.getShardingItem() + " 正在处理数据");
  }
}

泛型

public class Order {

  private Integer orderId;

  /**
   * 0未处理,1已处理
   */
  private Integer status;

  public Integer getOrderId() {
    return orderId;
  }

  public void setOrderId(Integer orderId) {
    this.orderId = orderId;
  }

  public Integer getStatus() {
    return status;
  }

  public void setStatus(Integer status) {
    this.status = status;
  }

  @Override
  public String toString() {
    return "Order{" +
        "orderId=" + orderId +
        ", status=" + status +
        '}';
  }
}

Main

public class App {

  public static void main(String[] args) {
    System.out.println("Hello World!");
   new JobScheduler(zkCenter(),configurationDataflow()).init();
  }

  /**
   * Zookeeper注册中心
   * @return
   */
  public static CoordinatorRegistryCenter zkCenter() {
    // 集群可用逗号分割
    ZookeeperConfiguration zc = new ZookeeperConfiguration(
        "129.28.198.30:2181", "java-simple-job");

    CoordinatorRegistryCenter crc = new ZookeeperRegistryCenter(zc);
    // 注册中心初始化
    crc.init();

    return crc;
  }

  /**
   * job 配置
   * @return
   */
  public static LiteJobConfiguration configurationDataflow() {
    // job 核心配置
    JobCoreConfiguration jcc = JobCoreConfiguration
        .newBuilder(
            "myDataflow",
            "0/10 * * * * ?",
            2)
        .build();

    // job 类型配置
    JobTypeConfiguration jtc = new DataflowJobConfiguration(
        jcc,
        MyDataflow.class.getCanonicalName(),
        // 如果填写 false 和 SimpleJob 一样 只执行一次
        true
    );

    // job 根的配置(LiteJobConfiguration)
    LiteJobConfiguration ljc = LiteJobConfiguration
        .newBuilder(jtc)
        // 很重要属性,如果在修改时间之后,因为配置已经上传到zk,没有覆盖重写,就不会变更新的定时时间
        .overwrite(true)
        .build();

    return ljc;
  }

}