6月份的时候,群里有一个小伙伴分享了一道面试题,如下图:
趁着下班的功夫,拆解下这道面试题
- 可以按顺序添加任意节点形成一个任务链
- 链表且有序 或者数组(考虑到数组的扩容,直接pass调,用有序链表LinkedList)
- 添加任意节点(可以指定索引添加)
- 任务需要按顺序执行,同一个节点可能有多个任务(可以并行执行)
- 线程池执行
- 同一个节点的任务,广度优先,可以利用Queue
- 节点完成可以指定任意个数
- Queue需要包装起来
- 同时需要计数,直接利用CountDownLatch
- 每个节点都可能失败,失败后可以通过接口进行重试
- 持久化保存节点任务执行状态
- 如果强依赖,必须按顺序再走一遍,成功的可以忽略,只执行失败的
- 只将失败任务重新串联起来
- 实现监控功能,可以监控当前任务状态
- 最简单的日志输出
- 如果有kibana,直接filebeat抽取日志到es,然后在kibana里配置视图,或者定时从es里抽取失败任务,做钉钉或短信报警
- 也可以filebeat抽取到kafka,然后消费处理,报警
- 隐含的,还有一层,任务
- 可以抽象出来,业务自定义去实现
ok,分析完了,开干。
定义任务并编写执行示例
/**
* 任务接口
* @Author: yxkong
* @Date: 2022/6/1 6:24 PM
* @version: 1.0
*/
public interface ITask {
/**
* 任务执行入口
* @return
*/
TaskResult execute();
}
/**
* 任务示例,具体的任务实现
* @Author: yxkong
* @Date: 2022/6/1 6:55 PM
* @version: 1.0
*/
public class TaskExample implements ITask{
private Random random = new Random();
//业务数据
private String name;
private int status;
@Override
public TaskResult execute() {
System.out.println(String.format("%s task name:%s starting",Thread.currentThread().getName(), this.name));
//TODO 执行具体的业务逻辑
try {
//添加随机等待,确保同一层级不使用一个线程
Thread.sleep(Long.valueOf(random.nextInt(500)));
} catch (InterruptedException e) {
e.printStackTrace();
}
if (status == 1){
System.out.println(String.format("%s task name:%s 任务执行成功",Thread.currentThread().getName(), this.name));
return new TaskResult(1,"任务执行成功");
} else {
System.out.println(String.format("%s task name:%s 任务执行失败",Thread.currentThread().getName(), this.name));
//模拟失败后,再次要执行的任务是成功的(正常,直接添加即可)
this.status = 1;
return new TaskResult(0,"任务执行失败!",this);
}
}
public TaskExample(String name, int status) {
this.name = name;
this.status = status;
}
@Override
public String toString() {
return "ITask{" +
"name='" + name + '\'' +
", status=" + status +
'}';
}
}
定义任务编排链路
先定义一个任务包装体
/**
* 任务包装体
* @Author: yxkong
* @Date: 2022/6/1 6:37 PM
* @version: 1.0
*/
public class TaskQueueWrap {
private Queue<ITask> queue ;
//至少完成个数
private int atLeast;
private TaskQueueWrap(int atLeast) {
this.queue = new ArrayDeque<>();
this.atLeast = atLeast;
}
public Queue<ITask> getQueue() {
return queue;
}
public int getAtLeast() {
return atLeast;
}
public boolean add(ITask task){
return this.queue.add(task);
}
public static TaskQueueWrap create(List<ITask> tasks,int atLeast){
TaskQueueWrap queue = new TaskQueueWrap(atLeast);
for (ITask task:tasks){
queue.add(task);
}
return queue;
}
public static TaskQueueWrap createEmpty(int atLeast){
return new TaskQueueWrap(atLeast);
}
}
定义任务链,并暴露各种操作方法,包括
addNew(...)
添加新层级任务(单个或批量)addLast(...)
在最后一个层级上添加任务addIdx(...)
指定层级添加任务execute()
链路执行
/**
* 任务链
* @Author: yxkong
* @Date: 2022/6/1 6:34 PM
* @version: 1.0
*/
public class TaskChain {
private LinkedList<TaskQueueWrap> chain;
private LinkedList<TaskCounter> counters;
public TaskChain() {
this.chain = new LinkedList<>();
this.counters = new LinkedList<>();
}
/**
* 在最后一个层级上添加任务
* @param task
* @return
*/
public boolean addLast(ITask task){
TaskQueueWrap queue = chain.getLast();
if (Objects.isNull(task) || Objects.isNull(queue)){
return Boolean.FALSE;
}
queue.add(task);
return Boolean.TRUE;
}
/**
* 添加新层级任务
* @param task
* @return
*/
public boolean addNew(ITask task, int atLeast){
if (Objects.isNull(task)){
return Boolean.FALSE;
}
TaskQueueWrap queue =TaskQueueWrap.createEmpty(atLeast);
queue.add(task);
this.chain.addLast(queue);
return Boolean.TRUE;
}
/**
* 添加新层级任务
* @param tasks
* @return
*/
public boolean addNew(List<ITask> tasks, int atLeast){
if (Objects.isNull(tasks)){
return Boolean.FALSE;
}
TaskQueueWrap queue = TaskQueueWrap.create(tasks,atLeast);
this.chain.add(queue);
return Boolean.TRUE;
}
private boolean add(TaskQueueWrap taskQueue) {
if (Objects.isNull(taskQueue)){
return Boolean.FALSE;
}
this.chain.add(taskQueue);
return Boolean.TRUE;
}
/**
* 指定层级添加任务
* @param task
* @param idx
* @return
*/
public boolean addIdx(ITask task,int idx){
if (Objects.isNull(task)){
return Boolean.FALSE;
}
if (idx > this.chain.size()){
return Boolean.FALSE;
}
if (idx == this.chain.size()){
return this.addNew(task,1);
}
this.chain.get(idx).add(task);
return Boolean.TRUE;
}
public TaskChain execute() {
//广度优先遍历
ThreadPoolExecutor executor = ThreadPoolUtils.getThreadPooll();
TaskChain failChain = new TaskChain();
//埋点1,任务开始执行
System.out.println("task starting");
for (int i = 0; i < this.chain.size(); i++) {
//层级开始埋点
System.out.println(String.format("task level:%d starting", (i+1)));
TaskQueueWrap queueWrap = this.chain.get(i);
Queue<ITask> queue = queueWrap.getQueue();
int queueSize = queue.size();
CountDownLatch count = new CountDownLatch(queueSize);
List<Future<TaskResult>> list = new ArrayList<>();
while (queue.size() != 0){
TaskThread taskThread = new TaskThread(queue.poll(),count);
list.add(executor.submit(taskThread));
}
try {
count.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<ITask> failTasks = counterAndFail(queueSize, list);
failChain.addNew(failTasks,queueWrap.getAtLeast());
//不满足至少,任务链结束
TaskCounter taskCounter = this.counters.getLast();
if (queueWrap.getAtLeast() > (taskCounter.getCounter()- taskCounter.getFailCounter())){
//将剩余的任务添加进去
for (int j = i+1; j < this.chain.size(); j++) {
failChain.add(this.chain.get(j));
}
return failChain;
}
//层级结束埋点
System.out.println(String.format("task level:%d end", (i+1)));
}
System.out.println("task end");
return failChain;
}
private List<ITask> counterAndFail(int size, List<Future<TaskResult>> list){
List<ITask> failTasks = new ArrayList<>();
try {
//计数
Integer failCounter = 0;
for (Future<TaskResult> f:list){
TaskResult result = f.get();
if (!result.isSuc()){
failCounter++;
ITask task =(ITask) result.getData();
failTasks.add(task);
}
}
counters.add(new TaskCounter(size,failCounter));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return failTasks;
}
}
同时定义了一个返回体
/**
* 任务返回状态
* @Author: yxkong
* @Date: 2022/6/1 6:18 PM
* @version: 1.0
*/
public class TaskResult {
//任务状态,1成功,0失败,失败的时候,data为原任务
private int status;
private String msg;
//任务数据
private Object data;
public TaskResult(int status, String msg, Object data) {
this.status = status;
this.msg = msg;
this.data = data;
}
public TaskResult(int status, String msg) {
this.status = status;
this.msg = msg;
}
public int getStatus() {
return status;
}
public boolean isSuc(){
if (status == 1){
return Boolean.TRUE;
}
return Boolean.FALSE;
}
public String getMsg() {
return msg;
}
public Object getData() {
return data;
}
}
计数器
/**
* 任务执行计数器
* 如果放在执行线程里,需要用AtomicInteger
* 如果在主线程里,用这个即可
* @Author: yxkong
* @Date: 2022/6/2 9:12 AM
* @version: 1.0
*/
public class TaskCounter {
private int counter;
private int failCounter;
public TaskCounter(int counter, int failCounter) {
this.counter = counter;
this.failCounter = failCounter;
}
public int getCounter() {
return counter;
}
public void setCounter(int counter) {
this.counter = counter;
}
public int getFailCounter() {
return failCounter;
}
public void setFailCounter(int failCounter) {
this.failCounter = failCounter;
}
}
定义任务线程
/**
* 任务线程
* @Author: yxkong
* @Date: 2022/6/1 6:32 PM
* @version: 1.0
*/
public class TaskThread implements Callable {
private ITask task;
private CountDownLatch downLatch;
public TaskThread(ITask task, CountDownLatch downLatch) {
this.task = task;
this.downLatch = downLatch;
}
@Override
public Object call() throws Exception {
TaskResult rst = new TaskResult(0,"执行失败!");
try {
rst = task.execute();
} catch (Exception e) {
e.printStackTrace();
}finally {
this.downLatch.countDown();
}
return rst;
}
}
测试
/**
* 测试类
* @Author: yxkong
* @Date: 2022/6/1 6:40 PM
* @version: 1.0
*/
public class TaskTest {
public static void main(String[] args) {
/**
* 任务链路的构建,可以直接从数据库里读取配置,然后构建成链
* 设计三张表,
* 一张表存任务链,描述这个任务链
* 第二张表,存储对应任务链的层级,以及对应层级的至少执行个数
* 第三张表,存储具体层级的任务,任务执行结果,执行时间,最后更新时间
* 可以再加辅助表,用于任务执行记录
*/
TaskChain chain = new TaskChain();
//第一层级添加任务
chain.addNew(new TaskExample("task1001",1),1);
chain.addLast(new TaskExample("task1002",0));
//第二层级添加任务
chain.addNew(new TaskExample("task2001",1),1);
chain.addLast(new TaskExample("task2002",0));
chain.addLast(new TaskExample("task2003",0));
//第三层级添加任务
chain.addNew(new TaskExample("task3001",0),1);
//第四层级添加任务
chain.addNew(new TaskExample("task4001",1),1);
chain.addLast(new TaskExample("task4002",0));
//给第一层级添加任务
chain.addIdx(new TaskExample("task1003",0),0);
//可以将执行失败的任务链持久化,然后接口调用再次触发
TaskChain failChain = chain.execute();
System.out.println(" 调用失败再试 ");
failChain.execute();
}
}
执行结果
task starting
task level:1 starting
pool-1-thread-2 task name:task1002 starting
pool-1-thread-1 task name:task1001 starting
pool-1-thread-3 task name:task1003 starting
pool-1-thread-2 task name:task1002 任务执行失败
pool-1-thread-1 task name:task1001 任务执行成功
pool-1-thread-3 task name:task1003 任务执行失败
task level:1 end
task level:2 starting
pool-1-thread-4 task name:task2001 starting
pool-1-thread-5 task name:task2002 starting
pool-1-thread-6 task name:task2003 starting
pool-1-thread-5 task name:task2002 任务执行失败
pool-1-thread-6 task name:task2003 任务执行失败
pool-1-thread-4 task name:task2001 任务执行成功
task level:2 end
task level:3 starting
pool-1-thread-7 task name:task3001 starting
pool-1-thread-7 task name:task3001 任务执行失败
调用失败再试
task starting
task level:1 starting
pool-1-thread-2 task name:task1003 starting
pool-1-thread-8 task name:task1002 starting
pool-1-thread-8 task name:task1002 任务执行成功
pool-1-thread-2 task name:task1003 任务执行成功
task level:1 end
task level:2 starting
pool-1-thread-1 task name:task2002 starting
pool-1-thread-3 task name:task2003 starting
pool-1-thread-3 task name:task2003 任务执行成功
pool-1-thread-1 task name:task2002 任务执行成功
task level:2 end
task level:3 starting
pool-1-thread-5 task name:task3001 starting
pool-1-thread-5 task name:task3001 任务执行成功
task level:3 end
task level:4 starting
pool-1-thread-6 task name:task4001 starting
pool-1-thread-4 task name:task4002 starting
pool-1-thread-4 task name:task4002 任务执行失败
pool-1-thread-6 task name:task4001 任务执行成功
task level:4 end
task end
文章评论