/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;

public class ExchangeLatchManager {
    private static final IgniteProductVersion VERSION_SINCE = IgniteProductVersion.fromString("2.5.0");
    private final IgniteLogger log;
    private final GridKernalContext ctx;
    @GridToStringExclude
    private final GridDiscoveryManager discovery;
    @GridToStringExclude
    private final GridIoManager io;
    @GridToStringExclude
    private volatile ClusterNode crd;
    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, Set<UUID>> pendingAcks = new ConcurrentHashMap<T2<String, AffinityTopologyVersion>, Set<UUID>>();
    @GridToStringInclude
    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ServerLatch> serverLatches = new ConcurrentHashMap<T2<String, AffinityTopologyVersion>, ServerLatch>();
    @GridToStringInclude
    private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ClientLatch> clientLatches = new ConcurrentHashMap<T2<String, AffinityTopologyVersion>, ClientLatch>();
    private final ReentrantLock lock = new ReentrantLock();

    public ExchangeLatchManager(GridKernalContext ctx) {
        this.ctx = ctx;
        this.log = ctx.log(this.getClass());
        this.discovery = ctx.discovery();
        this.io = ctx.io();
        if (!ctx.clientNode()) {
            ctx.io().addMessageListener(GridTopic.TOPIC_EXCHANGE, (nodeId, msg, plc) -> {
                if (msg instanceof LatchAckMessage) {
                    this.processAck(nodeId, (LatchAckMessage)msg);
                }
            });
            ctx.discovery().localJoinFuture().listen(f -> {
                if (f.error() == null) {
                    this.crd = this.getLatchCoordinator(AffinityTopologyVersion.NONE);
                }
            });
            ctx.event().addDiscoveryEventListener((e, cache) -> {
                assert (e != null);
                assert (e.type() == 11 || e.type() == 12) : this;
                ctx.closure().runLocalSafe(() -> this.processNodeLeft(e.eventNode()));
            }, 11, 12);
        }
    }

    private Latch createServerLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) {
        T2<String, AffinityTopologyVersion> latchId = new T2<String, AffinityTopologyVersion>(id, topVer);
        if (this.serverLatches.containsKey(latchId)) {
            return (Latch)this.serverLatches.get(latchId);
        }
        ServerLatch latch = new ServerLatch(id, topVer, participants);
        this.serverLatches.put(latchId, latch);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Server latch is created [latch=" + latchId + ", participantsSize=" + participants.size() + "]");
        }
        if (this.pendingAcks.containsKey(latchId)) {
            Set acks = (Set)this.pendingAcks.get(latchId);
            for (UUID node : acks) {
                if (!latch.hasParticipant(node) || latch.hasAck(node)) continue;
                latch.ack(node);
            }
            this.pendingAcks.remove(latchId);
        }
        if (latch.isCompleted()) {
            this.serverLatches.remove(latchId);
        }
        return latch;
    }

    private Latch createClientLatch(String id, AffinityTopologyVersion topVer, ClusterNode coordinator, Collection<ClusterNode> participants) {
        T2<String, AffinityTopologyVersion> latchId = new T2<String, AffinityTopologyVersion>(id, topVer);
        if (this.clientLatches.containsKey(latchId)) {
            return (Latch)this.clientLatches.get(latchId);
        }
        ClientLatch latch = new ClientLatch(id, topVer, coordinator, participants);
        if (this.log.isDebugEnabled()) {
            this.log.debug("Client latch is created [latch=" + latchId + ", crd=" + coordinator + ", participantsSize=" + participants.size() + "]");
        }
        if (this.pendingAcks.containsKey(latchId)) {
            latch.complete();
            this.pendingAcks.remove(latchId);
        } else {
            this.clientLatches.put(latchId, latch);
        }
        return latch;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Latch getOrCreate(String id, AffinityTopologyVersion topVer) {
        this.lock.lock();
        try {
            ClusterNode coordinator = this.getLatchCoordinator(topVer);
            if (coordinator == null) {
                ClientLatch latch = new ClientLatch(id, AffinityTopologyVersion.NONE, null, Collections.emptyList());
                latch.complete();
                ClientLatch clientLatch = latch;
                return clientLatch;
            }
            Collection<ClusterNode> participants = this.getLatchParticipants(topVer);
            Latch latch = coordinator.isLocal() ? this.createServerLatch(id, topVer, participants) : this.createClientLatch(id, topVer, coordinator, participants);
            return latch;
        }
        finally {
            this.lock.unlock();
        }
    }

    private Collection<ClusterNode> getLatchParticipants(AffinityTopologyVersion topVer) {
        Collection<ClusterNode> aliveNodes = topVer == AffinityTopologyVersion.NONE ? this.discovery.aliveServerNodes() : this.discovery.discoCache(topVer).aliveServerNodes();
        return aliveNodes.stream().filter(node -> node.version().compareTo(VERSION_SINCE) >= 0).collect(Collectors.toList());
    }

    @Nullable
    private ClusterNode getLatchCoordinator(AffinityTopologyVersion topVer) {
        Collection<ClusterNode> aliveNodes = topVer == AffinityTopologyVersion.NONE ? this.discovery.aliveServerNodes() : this.discovery.discoCache(topVer).aliveServerNodes();
        return aliveNodes.stream().filter(node -> node.version().compareTo(VERSION_SINCE) >= 0).findFirst().orElse(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processAck(UUID from, LatchAckMessage message) {
        this.lock.lock();
        try {
            ClusterNode coordinator = this.getLatchCoordinator(AffinityTopologyVersion.NONE);
            if (coordinator == null) {
                return;
            }
            T2<String, AffinityTopologyVersion> latchId = new T2<String, AffinityTopologyVersion>(message.latchId(), message.topVer());
            if (message.isFinal()) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Process final ack [latch=" + latchId + ", from=" + from + "]");
                }
                if (this.clientLatches.containsKey(latchId)) {
                    ClientLatch latch = (ClientLatch)this.clientLatches.remove(latchId);
                    latch.complete();
                } else if (!coordinator.isLocal()) {
                    this.pendingAcks.computeIfAbsent(latchId, id -> new GridConcurrentHashSet());
                    ((Set)this.pendingAcks.get(latchId)).add(from);
                } else if (coordinator.isLocal()) {
                    this.serverLatches.remove(latchId);
                }
            } else {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Process ack [latch=" + latchId + ", from=" + from + "]");
                }
                if (this.serverLatches.containsKey(latchId)) {
                    ServerLatch latch = (ServerLatch)this.serverLatches.get(latchId);
                    if (latch.hasParticipant(from) && !latch.hasAck(from)) {
                        latch.ack(from);
                        if (latch.isCompleted()) {
                            this.serverLatches.remove(latchId);
                        }
                    }
                } else {
                    this.pendingAcks.computeIfAbsent(latchId, id -> new GridConcurrentHashSet());
                    ((Set)this.pendingAcks.get(latchId)).add(from);
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private void becomeNewCoordinator() {
        if (this.log.isInfoEnabled()) {
            this.log.info("Become new coordinator " + this.crd.id());
        }
        ArrayList latchesToRestore = new ArrayList();
        latchesToRestore.addAll(this.pendingAcks.keySet());
        latchesToRestore.addAll(this.clientLatches.keySet());
        for (T2 latchId : latchesToRestore) {
            String id = (String)latchId.get1();
            AffinityTopologyVersion topVer = (AffinityTopologyVersion)latchId.get2();
            Collection<ClusterNode> participants = this.getLatchParticipants(topVer);
            if (participants.isEmpty()) continue;
            this.createServerLatch(id, topVer, participants);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processNodeLeft(ClusterNode left) {
        assert (this.crd != null) : "Coordinator is not initialized";
        this.lock.lock();
        try {
            CompletableLatch latch;
            ClusterNode coordinator;
            if (this.log.isDebugEnabled()) {
                this.log.debug("Process node left " + left.id());
            }
            if ((coordinator = this.getLatchCoordinator(AffinityTopologyVersion.NONE)) == null) {
                return;
            }
            for (Map.Entry ackEntry : this.pendingAcks.entrySet()) {
                if (!((Set)ackEntry.getValue()).contains(left.id())) continue;
                ((Set)this.pendingAcks.get(ackEntry.getKey())).remove(left.id());
            }
            for (Map.Entry latchEntry : this.clientLatches.entrySet()) {
                latch = (ClientLatch)latchEntry.getValue();
                if (!((ClientLatch)latch).hasCoordinator(left.id())) continue;
                if (latch.hasParticipant(coordinator.id())) {
                    ((ClientLatch)latch).newCoordinator(coordinator);
                    continue;
                }
                AffinityTopologyVersion topVer = (AffinityTopologyVersion)((T2)latchEntry.getKey()).get2();
                assert (this.getLatchParticipants(topVer).isEmpty());
                latch.complete(new IgniteCheckedException("All latch participants are left from topology."));
                this.clientLatches.remove(latchEntry.getKey());
            }
            for (Map.Entry latchEntry : this.serverLatches.entrySet()) {
                latch = (ServerLatch)latchEntry.getValue();
                if (!latch.hasParticipant(left.id()) || ((ServerLatch)latch).hasAck(left.id())) continue;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Process node left [latch=" + latchEntry.getKey() + ", left=" + left.id() + "]");
                }
                ((ServerLatch)latch).ack(left.id());
                if (!latch.isCompleted()) continue;
                this.serverLatches.remove(latchEntry.getKey());
            }
            if (coordinator.isLocal() && this.crd.id() != coordinator.id()) {
                this.crd = coordinator;
                this.becomeNewCoordinator();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public String toString() {
        return S.toString(ExchangeLatchManager.class, this);
    }

    private static abstract class CompletableLatch
    implements Latch {
        @GridToStringInclude
        protected final String id;
        @GridToStringInclude
        protected final AffinityTopologyVersion topVer;
        @GridToStringExclude
        protected final Set<UUID> participants;
        @GridToStringExclude
        protected final GridFutureAdapter<?> complete = new GridFutureAdapter();

        CompletableLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) {
            this.id = id;
            this.topVer = topVer;
            this.participants = participants.stream().map(ClusterNode::id).collect(Collectors.toSet());
        }

        @Override
        public void await() throws IgniteCheckedException {
            this.complete.get();
        }

        @Override
        public void await(long timeout, TimeUnit timeUnit) throws IgniteCheckedException {
            this.complete.get(timeout, timeUnit);
        }

        boolean hasParticipant(UUID node) {
            return this.participants.contains(node);
        }

        boolean isCompleted() {
            return this.complete.isDone();
        }

        void complete() {
            this.complete.onDone();
        }

        void complete(Throwable error) {
            this.complete.onDone(error);
        }

        String latchId() {
            return this.id + "-" + this.topVer;
        }

        public String toString() {
            return S.toString(CompletableLatch.class, this);
        }
    }

    class ClientLatch
    extends CompletableLatch {
        private volatile ClusterNode coordinator;
        private boolean ackSent;

        ClientLatch(String id, AffinityTopologyVersion topVer, ClusterNode coordinator, Collection<ClusterNode> participants) {
            super(id, topVer, participants);
            this.coordinator = coordinator;
        }

        private boolean hasCoordinator(UUID node) {
            return this.coordinator.id().equals(node);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void newCoordinator(ClusterNode coordinator) {
            if (ExchangeLatchManager.this.log.isDebugEnabled()) {
                ExchangeLatchManager.this.log.debug("Coordinator is changed [latch=" + this.latchId() + ", crd=" + coordinator.id() + "]");
            }
            ClientLatch clientLatch = this;
            synchronized (clientLatch) {
                this.coordinator = coordinator;
                if (this.ackSent) {
                    this.sendAck();
                }
            }
        }

        private void sendAck() {
            block3: {
                try {
                    this.ackSent = true;
                    ExchangeLatchManager.this.io.sendToGridTopic(this.coordinator, GridTopic.TOPIC_EXCHANGE, (Message)new LatchAckMessage(this.id, this.topVer, false), (byte)2);
                    if (ExchangeLatchManager.this.log.isDebugEnabled()) {
                        ExchangeLatchManager.this.log.debug("Ack is ackSent + [latch=" + this.latchId() + ", to=" + this.coordinator.id() + "]");
                    }
                }
                catch (IgniteCheckedException e) {
                    if (!ExchangeLatchManager.this.log.isDebugEnabled()) break block3;
                    ExchangeLatchManager.this.log.debug("Unable to send ack [latch=" + this.latchId() + ", to=" + this.coordinator.id() + "]: " + e.getMessage());
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void countDown() {
            if (this.isCompleted()) {
                return;
            }
            ClientLatch clientLatch = this;
            synchronized (clientLatch) {
                this.sendAck();
            }
        }

        @Override
        public String toString() {
            return S.toString(ClientLatch.class, this, "super", super.toString());
        }
    }

    class ServerLatch
    extends CompletableLatch {
        private final AtomicInteger permits;
        private final Set<UUID> acks;

        ServerLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) {
            super(id, topVer, participants);
            this.acks = new GridConcurrentHashSet<UUID>();
            this.permits = new AtomicInteger(participants.size());
            this.complete.listen(f -> {
                for (ClusterNode node : participants) {
                    try {
                        if (!ExchangeLatchManager.this.discovery.alive(node)) continue;
                        ExchangeLatchManager.this.io.sendToGridTopic(node, GridTopic.TOPIC_EXCHANGE, (Message)new LatchAckMessage(id, topVer, true), (byte)2);
                        if (!ExchangeLatchManager.this.log.isDebugEnabled()) continue;
                        ExchangeLatchManager.this.log.debug("Final ack is ackSent [latch=" + this.latchId() + ", to=" + node.id() + "]");
                    }
                    catch (IgniteCheckedException e) {
                        if (!ExchangeLatchManager.this.log.isDebugEnabled()) continue;
                        ExchangeLatchManager.this.log.debug("Unable to send final ack [latch=" + this.latchId() + ", to=" + node.id() + "]");
                    }
                }
            });
        }

        private boolean hasAck(UUID from) {
            return this.acks.contains(from);
        }

        private void ack(UUID from) {
            if (ExchangeLatchManager.this.log.isDebugEnabled()) {
                ExchangeLatchManager.this.log.debug("Ack is accepted [latch=" + this.latchId() + ", from=" + from + "]");
            }
            this.countDown0(from);
        }

        private void countDown0(UUID node) {
            if (this.isCompleted() || this.acks.contains(node)) {
                return;
            }
            this.acks.add(node);
            int remaining = this.permits.decrementAndGet();
            if (ExchangeLatchManager.this.log.isDebugEnabled()) {
                ExchangeLatchManager.this.log.debug("Count down + [latch=" + this.latchId() + ", remaining=" + remaining + "]");
            }
            if (remaining == 0) {
                this.complete();
            }
        }

        @Override
        public void countDown() {
            this.countDown0(ExchangeLatchManager.this.ctx.localNodeId());
        }

        @Override
        public String toString() {
            Set pendingAcks = this.participants.stream().filter(ack -> !this.acks.contains(ack)).collect(Collectors.toSet());
            return S.toString(ServerLatch.class, this, "pendingAcks", pendingAcks, "super", super.toString());
        }
    }
}

