/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.supports.cluster.redis;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.jetlinks.core.cluster.ClusterQueue;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.script.RedisScript;
import reactor.core.Disposable;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;

public class RedisClusterQueue<T>
implements ClusterQueue<T> {
    private final String id;
    private ReactiveRedisOperations<String, T> operations;
    private FluxProcessor<T, T> processor = EmitterProcessor.create((int)512, (boolean)false);
    private AtomicBoolean polling = new AtomicBoolean(false);
    private volatile Disposable disposable;
    private volatile Disposable timer;
    private int batchSize = 32;
    private volatile float localConsumerPercent = 1.0f;
    private final RedisScript<List<T>> batchPollScript = RedisScript.of((String)("local val = redis.call('lrange',KEYS[1],0," + this.batchSize + ");redis.call('ltrim',KEYS[1]," + (this.batchSize + 1) + ",-1);return val;"), List.class);
    private final RedisScript<Long> pushAndPublish;

    public void setLocalConsumerPercent(float localConsumerPercent) {
        this.localConsumerPercent = localConsumerPercent;
    }

    public RedisClusterQueue(String id, ReactiveRedisOperations<String, T> operations) {
        this.id = id;
        this.operations = operations;
        this.pushAndPublish = RedisScript.of((String)("local val = redis.call('lpush',KEYS[1],ARGV[1]);redis.call('publish'," + "'queue:data:produced:".concat(id) + "',ARGV[2]);return val;"), Long.class);
    }

    protected void startPoll() {
        if (this.disposable != null || this.timer != null) {
            return;
        }
        this.disposable = this.operations.listenToChannel(new String[]{"queue:data:produced:".concat(this.id)}).map(ReactiveSubscription.Message::getMessage).subscribe(sub -> this.doPoll());
        this.timer = Flux.interval((Duration)Duration.ofSeconds(5L)).subscribe(r -> this.doPoll());
    }

    protected void doPoll() {
        if (this.polling.compareAndSet(false, true)) {
            if (!this.processor.hasDownstreams()) {
                this.stopPoll();
                return;
            }
            this.pollBatch().doOnNext(v -> {
                if (!this.processor.hasDownstreams()) {
                    this.operations.opsForList().leftPush((Object)this.id, v).subscribe();
                } else {
                    this.processor.onNext(v);
                }
            }).count().doFinally(s -> this.polling.set(false)).subscribe(r -> {
                if (r >= (long)this.batchSize) {
                    this.polling.set(false);
                    this.doPoll();
                }
            });
        }
    }

    protected void stopPoll() {
        if (this.disposable != null) {
            this.disposable.dispose();
            this.disposable = null;
        }
        if (this.timer != null) {
            this.timer.dispose();
            this.timer = null;
        }
    }

    @Nonnull
    public Flux<T> subscribe() {
        return this.processor.doOnSubscribe(sub -> this.startPoll()).doFinally(s -> this.stopPoll());
    }

    public void stop() {
        this.stopPoll();
    }

    public Mono<Integer> size() {
        return this.operations.opsForList().size((Object)this.id).map(Number::intValue);
    }

    @Nonnull
    public Mono<T> poll() {
        return this.operations.opsForList().leftPop((Object)this.id);
    }

    private Flux<T> pollBatch() {
        return this.operations.execute(this.batchPollScript, Collections.singletonList(this.id)).flatMap(Flux::fromIterable);
    }

    private ReactiveRedisOperations getOperations() {
        return this.operations;
    }

    public Mono<Boolean> add(Publisher<T> publisher) {
        return Flux.from(publisher).flatMap(v -> {
            if (this.processor.hasDownstreams() && Math.random() < (double)this.localConsumerPercent) {
                this.processor.onNext(v);
                return Mono.just((Object)1);
            }
            return this.getOperations().execute(this.pushAndPublish, Arrays.asList(this.id), Arrays.asList(v, "1"));
        }).then(Mono.just((Object)true));
    }

    public Mono<Boolean> addBatch(Publisher<? extends Collection<T>> publisher) {
        return Flux.from(publisher).flatMap(v -> {
            if (this.processor.hasDownstreams() && Math.random() < (double)this.localConsumerPercent) {
                v.forEach(arg_0 -> this.processor.onNext(arg_0));
                return Mono.just((Object)1);
            }
            return this.operations.opsForList().leftPushAll((Object)this.id, v).doOnNext(l -> this.getOperations().convertAndSend("queue:data:produced:".concat(this.id), (Object)"1"));
        }).then(Mono.just((Object)true));
    }
}

