/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.IgniteSpiException;

class GridDhtPartitionSupplier {
    private final CacheGroupContext grp;
    private final IgniteLogger log;
    private GridDhtPartitionTopology top;
    private IgnitePredicate<GridCacheEntryInfo> preloadPred;
    private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext>();

    GridDhtPartitionSupplier(CacheGroupContext grp) {
        assert (grp != null);
        this.grp = grp;
        this.log = grp.shared().logger(this.getClass());
        this.top = grp.topology();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void stop() {
        Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
        synchronized (map) {
            Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = this.scMap.keySet().iterator();
            while (it.hasNext()) {
                T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
                GridDhtPartitionSupplier.clearContext(this.scMap.get(t), this.log);
                it.remove();
            }
        }
    }

    private static void clearContext(SupplyContext sc, IgniteLogger log) {
        IgniteRebalanceIterator it;
        if (sc != null && (it = sc.iterator) != null && !it.isClosed()) {
            try {
                it.close();
            }
            catch (IgniteCheckedException e) {
                U.error(log, "Iterator close failed.", e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void onTopologyChanged(AffinityTopologyVersion topVer) {
        Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
        synchronized (map) {
            Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = this.scMap.keySet().iterator();
            while (it.hasNext()) {
                T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
                if (topVer.compareTo((AffinityTopologyVersion)t.get3()) <= 0) continue;
                GridDhtPartitionSupplier.clearContext(this.scMap.get(t), this.log);
                it.remove();
                if (!this.log.isDebugEnabled()) continue;
                this.log.debug("Supply context removed [node=" + t.get1() + "]");
            }
        }
    }

    void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
        this.preloadPred = preloadPred;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemandMessage d) {
        block50: {
            ClusterNode node;
            AffinityTopologyVersion demTop;
            assert (d != null);
            assert (nodeId != null);
            AffinityTopologyVersion curTop = this.grp.affinity().lastVersion();
            if (curTop.compareTo(demTop = d.topologyVersion()) > 0) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Demand request outdated [grp=" + this.grp.cacheOrGroupName() + ", currentTopVer=" + curTop + ", demandTopVer=" + demTop + ", from=" + nodeId + ", topicId=" + topicId + "]");
                }
                return;
            }
            T3<UUID, Integer, AffinityTopologyVersion> contextId = new T3<UUID, Integer, AffinityTopologyVersion>(nodeId, topicId, demTop);
            if (d.rebalanceId() < 0L) {
                Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
                synchronized (map) {
                    SupplyContext sctx = this.scMap.get(contextId);
                    if (sctx != null && sctx.rebalanceId == -d.rebalanceId()) {
                        GridDhtPartitionSupplier.clearContext(this.scMap.remove(contextId), this.log);
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Supply context cleaned [grp=" + this.grp.cacheOrGroupName() + ", from=" + nodeId + ", demandMsg=" + d + ", supplyContext=" + sctx + "]");
                        }
                    } else if (this.log.isDebugEnabled()) {
                        this.log.debug("Stale supply context cleanup message [grp=" + this.grp.cacheOrGroupName() + ", from=" + nodeId + ", demandMsg=" + d + ", supplyContext=" + sctx + "]");
                    }
                    return;
                }
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Demand request accepted [grp=" + this.grp.cacheOrGroupName() + ", from=" + nodeId + ", currentVer=" + curTop + ", demandedVer=" + demTop + ", topicId=" + topicId + "]");
            }
            if ((node = this.grp.shared().discovery().node(nodeId)) == null) {
                return;
            }
            try {
                GridDhtLocalPartition loc;
                HashSet<Integer> remainingParts;
                IgniteRebalanceIterator iter;
                SupplyContext sctx;
                Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
                synchronized (map) {
                    sctx = this.scMap.remove(contextId);
                    if (sctx != null && d.rebalanceId() < sctx.rebalanceId) {
                        this.scMap.put(contextId, sctx);
                        if (this.log.isDebugEnabled()) {
                            this.log.debug("Stale demand message [grp=" + this.grp.cacheOrGroupName() + ", actualContext=" + sctx + ", from=" + nodeId + ", demandMsg=" + d + "]");
                        }
                        return;
                    }
                }
                if (sctx == null && (d.partitions() == null || d.partitions().isEmpty())) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Empty demand message [grp=" + this.grp.cacheOrGroupName() + ", from=" + nodeId + ", topicId=" + topicId + ", demandMsg=" + d + "]");
                    }
                    return;
                }
                assert (sctx == null || d.partitions().isEmpty());
                long batchesCnt = 0L;
                long maxBatchesCnt = this.grp.config().getRebalanceBatchesPrefetchCount();
                if (sctx != null) {
                    maxBatchesCnt = 1L;
                } else if (this.log.isDebugEnabled()) {
                    this.log.debug("Starting supplying rebalancing [cache=" + this.grp.cacheOrGroupName() + ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() + ", topology=" + demTop + ", rebalanceId=" + d.rebalanceId() + ", topicId=" + topicId + "]");
                }
                GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage(d.rebalanceId(), this.grp.groupId(), d.topologyVersion(), this.grp.deploymentEnabled());
                if (sctx == null || sctx.iterator == null) {
                    iter = this.grp.offheap().rebalanceIterator(d.partitions(), d.topologyVersion());
                    remainingParts = new HashSet<Integer>(d.partitions().fullSet());
                    CachePartitionPartialCountersMap histMap = d.partitions().historicalMap();
                    for (int i = 0; i < histMap.size(); ++i) {
                        int p = histMap.partitionAt(i);
                        remainingParts.add(p);
                    }
                    for (Integer part : d.partitions().fullSet()) {
                        if (iter.isPartitionMissing(part)) continue;
                        loc = this.top.localPartition(part, d.topologyVersion(), false);
                        assert (loc != null && loc.state() == GridDhtPartitionState.OWNING) : "Partition should be in OWNING state: " + loc;
                        s.addEstimatedKeysCount(this.grp.offheap().totalPartitionEntriesCount(part));
                    }
                    for (int i = 0; i < histMap.size(); ++i) {
                        int p = histMap.partitionAt(i);
                        if (iter.isPartitionMissing(p)) continue;
                        s.addEstimatedKeysCount(histMap.updateCounterAt(i) - histMap.initialUpdateCounterAt(i));
                    }
                } else {
                    iter = sctx.iterator;
                    remainingParts = sctx.remainingParts;
                }
                int messageMaxSize = this.grp.config().getRebalanceBatchSize();
                while (iter.hasNext()) {
                    if (s.messageSize() >= messageMaxSize) {
                        if (++batchesCnt >= maxBatchesCnt) {
                            this.saveSupplyContext(contextId, iter, remainingParts, d.rebalanceId());
                            this.reply(node, d, s, contextId);
                            return;
                        }
                        if (!this.reply(node, d, s, contextId)) {
                            return;
                        }
                        s = new GridDhtPartitionSupplyMessage(d.rebalanceId(), this.grp.groupId(), d.topologyVersion(), this.grp.deploymentEnabled());
                    }
                    CacheDataRow row = (CacheDataRow)iter.next();
                    int part = row.partition();
                    loc = this.top.localPartition(part, d.topologyVersion(), false);
                    assert (loc != null && loc.state() == GridDhtPartitionState.OWNING && loc.reservations() > 0 || iter.isPartitionMissing(part)) : "Partition should be in OWNING state and has at least 1 reservation " + loc;
                    if (iter.isPartitionMissing(part) && remainingParts.contains(part)) {
                        s.missed(part);
                        remainingParts.remove(part);
                        if (!this.log.isDebugEnabled()) continue;
                        this.log.debug("Requested partition is marked as missing on local node [part=" + part + ", demander=" + nodeId + ']');
                        continue;
                    }
                    if (!remainingParts.contains(part)) continue;
                    GridCacheEntryInfo info = new GridCacheEntryInfo();
                    info.key(row.key());
                    info.expireTime(row.expireTime());
                    info.version(row.version());
                    info.value(row.value());
                    info.cacheId(row.cacheId());
                    if (this.preloadPred == null || this.preloadPred.apply(info)) {
                        s.addEntry0(part, iter.historical(part), info, this.grp.shared(), this.grp.cacheObjectContext());
                    } else if (this.log.isDebugEnabled()) {
                        this.log.debug("Rebalance predicate evaluated to false (will not send cache entry): " + info);
                    }
                    if (!iter.isPartitionDone(part)) continue;
                    s.last(part, loc.updateCounter());
                    remainingParts.remove(part);
                }
                Iterator remainingIter = remainingParts.iterator();
                while (remainingIter.hasNext()) {
                    int p = (Integer)remainingIter.next();
                    if (iter.isPartitionDone(p)) {
                        loc = this.top.localPartition(p, d.topologyVersion(), false);
                        assert (loc != null) : "Supply partition is gone: grp=" + this.grp.cacheOrGroupName() + ", p=" + p;
                        s.last(p, loc.updateCounter());
                        remainingIter.remove();
                        continue;
                    }
                    if (!iter.isPartitionMissing(p)) continue;
                    s.missed(p);
                    remainingIter.remove();
                }
                assert (remainingParts.isEmpty()) : "Partitions after rebalance should be either done or missing: " + remainingParts;
                if (sctx != null) {
                    GridDhtPartitionSupplier.clearContext(sctx, this.log);
                } else {
                    iter.close();
                }
                this.reply(node, d, s, contextId);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Finished supplying rebalancing [cache=" + this.grp.cacheOrGroupName() + ", fromNode=" + node.id() + ", topology=" + demTop + ", rebalanceId=" + d.rebalanceId() + ", topicId=" + topicId + "]");
                }
            }
            catch (IgniteCheckedException e) {
                U.error(this.log, "Failed to send partition supply message to node: " + nodeId, e);
            }
            catch (IgniteSpiException e) {
                if (!this.log.isDebugEnabled()) break block50;
                this.log.debug("Failed to send message to node (current node is stopping?) [node=" + node.id() + ", msg=" + e.getMessage() + ']');
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean reply(ClusterNode node, GridDhtPartitionDemandMessage d, GridDhtPartitionSupplyMessage s, T3<UUID, Integer, AffinityTopologyVersion> contextId) throws IgniteCheckedException {
        try {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Replying to partition demand [node=" + node.id() + ", demand=" + d + ", supply=" + s + ']');
            }
            this.grp.shared().io().sendOrderedMessage(node, d.topic(), s, this.grp.ioPolicy(), d.timeout());
            if (this.grp.config().getRebalanceThrottle() > 0L) {
                U.sleep(this.grp.config().getRebalanceThrottle());
            }
            return true;
        }
        catch (ClusterTopologyCheckedException ignore) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Failed to send partition supply message because node left grid: " + node.id());
            }
            Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
            synchronized (map) {
                GridDhtPartitionSupplier.clearContext(this.scMap.remove(contextId), this.log);
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void saveSupplyContext(T3<UUID, Integer, AffinityTopologyVersion> contextId, IgniteRebalanceIterator entryIt, Set<Integer> remainingParts, long rebalanceId) {
        Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> map = this.scMap;
        synchronized (map) {
            assert (this.scMap.get(contextId) == null);
            this.scMap.put(contextId, new SupplyContext(entryIt, remainingParts, rebalanceId));
        }
    }

    private static class SupplyContext {
        @GridToStringExclude
        private final IgniteRebalanceIterator iterator;
        private final Set<Integer> remainingParts;
        private final long rebalanceId;

        SupplyContext(IgniteRebalanceIterator iterator, Set<Integer> remainingParts, long rebalanceId) {
            this.iterator = iterator;
            this.remainingParts = remainingParts;
            this.rebalanceId = rebalanceId;
        }

        public String toString() {
            return S.toString(SupplyContext.class, this);
        }
    }
}

