/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.cluster.balancer;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.api.scheduler.SchedulerSelector;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.cluster.RuleInstance;
import org.jetlinks.rule.engine.cluster.SchedulerRegistry;
import org.jetlinks.rule.engine.cluster.TaskSnapshotRepository;
import org.jetlinks.rule.engine.cluster.balancer.SchedulerLoadBalancer;
import org.jetlinks.rule.engine.defaults.ScheduleJobCompiler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class DefaultSchedulerLoadBalancer
implements SchedulerLoadBalancer {
    private static final Logger log = LoggerFactory.getLogger(DefaultSchedulerLoadBalancer.class);
    private boolean autoReBalance = true;
    protected final EventBus eventBus;
    protected final SchedulerRegistry registry;
    protected final TaskSnapshotRepository snapshotRepository;
    protected final SchedulerSelector schedulerSelector;

    public DefaultSchedulerLoadBalancer(EventBus eventBus, SchedulerRegistry registry, TaskSnapshotRepository snapshotRepository) {
        this(eventBus, registry, snapshotRepository, SchedulerSelector.selectAll);
    }

    public DefaultSchedulerLoadBalancer(EventBus eventBus, SchedulerRegistry registry, TaskSnapshotRepository snapshotRepository, SchedulerSelector schedulerSelector) {
        this.eventBus = eventBus;
        this.registry = registry;
        this.snapshotRepository = snapshotRepository;
        this.schedulerSelector = schedulerSelector;
    }

    public void setup() {
        this.setupAsync().block(Duration.ofSeconds(30L));
    }

    public Mono<Void> setupAsync() {
        return Flux.fromIterable(this.registry.getLocalSchedulers()).flatMap(scheduler -> this.snapshotRepository.findBySchedulerId(scheduler.getId()).filterWhen(snapshot -> this.canSchedule((Scheduler)scheduler, snapshot.getJob()).flatMap(can -> {
            if (!can.booleanValue()) {
                return this.snapshotRepository.removeTaskById(snapshot.getId()).thenReturn((Object)false);
            }
            return Mono.just((Object)true);
        })).flatMap(snapshot -> scheduler.schedule(snapshot.getJob()).flatMap(task -> {
            if (snapshot.getState() == Task.State.running) {
                return task.start();
            }
            return Mono.empty();
        }).onErrorResume(err -> {
            log.debug(err.getMessage(), err);
            return Mono.empty();
        }))).doOnError(err -> log.debug(err.getMessage(), err)).then();
    }

    public void cleanup() {
    }

    @Override
    public Mono<Void> reBalance(List<Scheduler> schedulers, boolean balanceAll) {
        if (CollectionUtils.isEmpty(schedulers)) {
            return Mono.empty();
        }
        return Mono.error((Throwable)new UnsupportedOperationException());
    }

    @Override
    public Mono<Void> reBalance(List<Scheduler> schedulers, RuleInstance instance, boolean balanceAll) {
        Map jobs = new ScheduleJobCompiler(instance.getId(), instance.getModel()).compile().stream().collect(Collectors.toMap(ScheduleJob::getNodeId, Function.identity()));
        ConcurrentHashMap ready = new ConcurrentHashMap();
        for (Scheduler scheduler2 : schedulers) {
            ready.put(scheduler2, new ConcurrentHashMap(jobs));
        }
        return Flux.fromIterable(schedulers).flatMap(scheduler -> {
            Map readyJob = (Map)ready.get(scheduler);
            return scheduler.getSchedulingTask(instance.getId()).doOnNext(task -> {
                ScheduleJob cfr_ignored_0 = (ScheduleJob)readyJob.remove(task.getJob().getNodeId());
            }).thenMany((Publisher)Flux.defer(() -> this.createTask((Scheduler)scheduler, readyJob.values())));
        }).collectList().flatMap(list -> (Mono)Flux.fromIterable((Iterable)list).flatMap(task -> {
            log.info("schedule new task[id={} instanceId={} ,nodeId={},executor={}] in scheduler[{}]", new Object[]{task.getId(), task.getJob().getInstanceId(), task.getJob().getNodeId(), task.getJob().getExecutor(), task.getSchedulerId()});
            return task.start().thenReturn(task);
        }).flatMap(Task::dump).as(this.snapshotRepository::saveTaskSnapshots));
    }

    private Flux<Task> createTask(Scheduler scheduler, Collection<ScheduleJob> jobs) {
        return Flux.fromIterable(jobs).filterWhen(job -> this.canSchedule(scheduler, (ScheduleJob)job)).flatMap(arg_0 -> ((Scheduler)scheduler).schedule(arg_0));
    }

    protected Mono<Boolean> canSchedule(Scheduler scheduler, ScheduleJob job) {
        return Flux.merge((Publisher[])new Publisher[]{scheduler.canSchedule(job), this.schedulerSelector.test(scheduler, job)}).defaultIfEmpty((Object)false).all(Boolean::booleanValue);
    }

    public void setAutoReBalance(boolean autoReBalance) {
        this.autoReBalance = autoReBalance;
    }
}

