/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.cluster.ClusterTopic;
import org.jetlinks.core.device.DeviceOperationBroker;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.device.ReplyFailureHandler;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.supports.cluster.redis.DeviceCheckRequest;
import org.jetlinks.supports.cluster.redis.DeviceCheckResponse;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.UnicastProcessor;

public class ClusterDeviceOperationBroker
implements DeviceOperationBroker,
MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(ClusterDeviceOperationBroker.class);
    private ClusterManager clusterManager;
    private String serverId;
    private Map<String, FluxProcessor<DeviceMessageReply, DeviceMessageReply>> replyProcessor = new ConcurrentHashMap<String, FluxProcessor<DeviceMessageReply, DeviceMessageReply>>();
    private Map<String, FluxProcessor<DeviceCheckResponse, DeviceCheckResponse>> checkRequests = new ConcurrentHashMap<String, FluxProcessor<DeviceCheckResponse, DeviceCheckResponse>>();
    private Function<Publisher<String>, Flux<DeviceStateInfo>> localStateChecker;
    private Map<String, AtomicInteger> fragmentCounter = new ConcurrentHashMap<String, AtomicInteger>();
    private ReplyFailureHandler replyFailureHandler = (error, message) -> log.warn("unhandled reply message:{}", (Object)message, (Object)error);

    public ClusterDeviceOperationBroker(ClusterManager clusterManager) {
        this.clusterManager = clusterManager;
        this.serverId = clusterManager.getCurrentServerId();
        this.init();
    }

    public void init() {
        this.clusterManager.getTopic("device:state:check:result:".concat(this.serverId)).subscribe().subscribe(msg -> Optional.ofNullable(this.checkRequests.remove(msg.getRequestId())).ifPresent(processor -> {
            processor.onNext(msg);
            processor.onComplete();
        }));
        this.clusterManager.getTopic("device:msg:reply").subscribe().subscribe(msg -> {
            if (msg instanceof DeviceMessageReply) {
                this.handleReply((DeviceMessageReply)msg);
            }
        });
    }

    public Flux<DeviceStateInfo> getDeviceState(String deviceGatewayServerId, Collection<String> deviceIdList) {
        return Flux.defer(() -> {
            if (this.serverId.equals(deviceGatewayServerId) && this.localStateChecker != null) {
                return (Publisher)this.localStateChecker.apply((Publisher<String>)Flux.fromIterable((Iterable)deviceIdList));
            }
            String uid = UUID.randomUUID().toString();
            DeviceCheckRequest request = new DeviceCheckRequest(this.serverId, uid, new ArrayList<String>(deviceIdList));
            EmitterProcessor processor = EmitterProcessor.create((boolean)true);
            this.checkRequests.put(uid, (FluxProcessor<DeviceCheckResponse, DeviceCheckResponse>)processor);
            return this.clusterManager.getTopic("device:state:checker:".concat(deviceGatewayServerId)).publish((Publisher)Mono.just((Object)request)).flatMapMany(m -> processor.flatMap(deviceCheckResponse -> Flux.fromIterable(deviceCheckResponse.getStateInfoList()))).timeout(Duration.ofSeconds(5L), (Publisher)Flux.empty());
        });
    }

    public void handleGetDeviceState(String serverId, Function<Publisher<String>, Flux<DeviceStateInfo>> stateMapper) {
        this.localStateChecker = stateMapper;
        this.clusterManager.getTopic("device:state:checker:".concat(serverId)).subscribe().subscribe(request -> ((Mono)((Flux)stateMapper.apply((Publisher<String>)Flux.fromIterable(request.getDeviceId()))).collectList().map(resp -> new DeviceCheckResponse((List<DeviceStateInfo>)resp, request.getRequestId())).as(arg_0 -> ((ClusterTopic)this.clusterManager.getTopic("device:state:check:result:".concat(request.getFrom()))).publish(arg_0))).subscribe(len -> {
            if (len <= 0) {
                log.warn("device check reply fail");
            }
        }));
    }

    public Flux<DeviceMessageReply> handleReply(String messageId, Duration timeout) {
        return this.replyProcessor.computeIfAbsent(messageId, ignore -> UnicastProcessor.create()).timeout(timeout, (Publisher)Mono.error(() -> new DeviceOperationException(ErrorCode.TIME_OUT))).doFinally(signal -> {
            this.replyProcessor.remove(messageId);
            this.fragmentCounter.remove(messageId);
        });
    }

    public Mono<Integer> send(String deviceGatewayServerId, Publisher<? extends Message> message) {
        return Flux.from(message).map(msg -> msg.addHeader(Headers.sendFrom, (Object)this.clusterManager.getCurrentServerId())).flatMap(msg -> this.clusterManager.getTopic("device:msg:p2p:".concat(deviceGatewayServerId)).publish((Publisher)Mono.just((Object)msg))).takeWhile(l -> l > 0).last((Object)0);
    }

    public Mono<Integer> send(Publisher<? extends BroadcastMessage> message) {
        return this.clusterManager.getTopic("device:msg:broadcast").publish(message);
    }

    public Flux<Message> handleSendToDeviceMessage(String serverId) {
        return this.clusterManager.getTopic("device:msg:p2p:".concat(serverId)).subscribe().cast(Message.class);
    }

    public Mono<Boolean> reply(DeviceMessageReply message) {
        return Mono.defer(() -> {
            message.addHeader(Headers.replyFrom, (Object)this.serverId);
            if (this.replyProcessor.containsKey(message.getMessageId())) {
                this.handleReply(message);
                return Mono.just((Object)true);
            }
            return this.clusterManager.getTopic("device:msg:reply").publish((Publisher)Mono.just((Object)message)).map(l -> l > 0).switchIfEmpty(Mono.just((Object)false));
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleReply(DeviceMessageReply message) {
        try {
            String messageId = message.getMessageId();
            if (StringUtils.isEmpty((Object)messageId)) {
                log.warn("reply message messageId is empty: {}", (Object)message);
                return;
            }
            String partMsgId = message.getHeader(Headers.fragmentBodyMessageId).orElse(null);
            if (partMsgId != null) {
                FluxProcessor<DeviceMessageReply, DeviceMessageReply> processor = this.replyProcessor.getOrDefault(partMsgId, this.replyProcessor.get(messageId));
                if (processor == null || processor.isDisposed()) {
                    this.replyProcessor.remove(partMsgId);
                    return;
                }
                int partTotal = message.getHeader(Headers.fragmentNumber).orElse(1);
                AtomicInteger counter = this.fragmentCounter.computeIfAbsent(partMsgId, r -> new AtomicInteger(partTotal));
                try {
                    processor.onNext((Object)message);
                }
                finally {
                    if (counter.decrementAndGet() <= 0 || message.getHeader(Headers.fragmentLast).orElse(false).booleanValue()) {
                        try {
                            processor.onComplete();
                        }
                        finally {
                            this.replyProcessor.remove(partMsgId);
                            this.fragmentCounter.remove(partMsgId);
                        }
                    }
                }
                return;
            }
            FluxProcessor<DeviceMessageReply, DeviceMessageReply> processor = this.replyProcessor.get(messageId);
            if (processor != null && !processor.isDisposed()) {
                processor.onNext((Object)message);
                processor.onComplete();
            } else {
                this.replyProcessor.remove(messageId);
            }
        }
        catch (Exception e) {
            this.replyFailureHandler.handle((Throwable)e, message);
        }
    }

    public void setReplyFailureHandler(ReplyFailureHandler replyFailureHandler) {
        this.replyFailureHandler = replyFailureHandler;
    }
}

