/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.visor.verify;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.verify.PartitionKey;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.processors.query.QueryTypeDescriptorImpl;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Row;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.visor.verify.IndexValidationIssue;
import org.apache.ignite.internal.visor.verify.ValidateIndexesPartitionResult;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.h2.engine.Session;
import org.h2.index.Cursor;
import org.h2.index.Index;
import org.h2.result.SearchRow;

public class ValidateIndexesClosure
implements IgniteCallable<Map<PartitionKey, ValidateIndexesPartitionResult>> {
    private static final long serialVersionUID = 0L;
    @IgniteInstanceResource
    private transient IgniteEx ignite;
    @LoggerResource
    private IgniteLogger log;
    private Set<String> cacheNames;
    private final AtomicInteger completionCntr = new AtomicInteger(0);
    private volatile ExecutorService calcExecutor;

    public ValidateIndexesClosure(Set<String> cacheNames) {
        this.cacheNames = cacheNames;
    }

    public Map<PartitionKey, ValidateIndexesPartitionResult> call() throws Exception {
        this.calcExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        try {
            Map<PartitionKey, ValidateIndexesPartitionResult> map = this.call0();
            return map;
        }
        finally {
            this.calcExecutor.shutdown();
        }
    }

    private Map<PartitionKey, ValidateIndexesPartitionResult> call0() throws Exception {
        HashSet<Integer> grpIds = new HashSet<Integer>();
        HashSet<String> missingCaches = new HashSet<String>();
        if (this.cacheNames != null) {
            for (String string : this.cacheNames) {
                DynamicCacheDescriptor dynamicCacheDescriptor = this.ignite.context().cache().cacheDescriptor(string);
                if (dynamicCacheDescriptor == null) {
                    missingCaches.add(string);
                    continue;
                }
                grpIds.add(dynamicCacheDescriptor.groupId());
            }
            if (!missingCaches.isEmpty()) {
                StringBuilder strBuilder = new StringBuilder("The following caches do not exist: ");
                for (String string : missingCaches) {
                    strBuilder.append(string).append(", ");
                }
                strBuilder.delete(strBuilder.length() - 2, strBuilder.length());
                throw new IgniteException(strBuilder.toString());
            }
        } else {
            Collection groups = this.ignite.context().cache().cacheGroups();
            for (CacheGroupContext cacheGroupContext : groups) {
                if (cacheGroupContext.systemCache() || cacheGroupContext.isLocal()) continue;
                grpIds.add(cacheGroupContext.groupId());
            }
        }
        ArrayList<Future<Map<PartitionKey, ValidateIndexesPartitionResult>>> procPartFutures = new ArrayList<Future<Map<PartitionKey, ValidateIndexesPartitionResult>>>();
        this.completionCntr.set(0);
        for (Integer n : grpIds) {
            CacheGroupContext grpCtx = this.ignite.context().cache().cacheGroup(n.intValue());
            if (grpCtx == null) continue;
            List parts = grpCtx.topology().localPartitions();
            for (GridDhtLocalPartition part : parts) {
                procPartFutures.add(this.processPartitionAsync(grpCtx, part));
            }
        }
        HashMap<PartitionKey, ValidateIndexesPartitionResult> hashMap = new HashMap<PartitionKey, ValidateIndexesPartitionResult>();
        long l = U.currentTimeMillis();
        int i = 0;
        while (i < procPartFutures.size()) {
            Future fut = (Future)procPartFutures.get(i);
            try {
                Map partRes = (Map)fut.get(1L, TimeUnit.SECONDS);
                hashMap.putAll(partRes);
                ++i;
            }
            catch (InterruptedException | ExecutionException e) {
                for (int j = i + 1; j < procPartFutures.size(); ++j) {
                    ((Future)procPartFutures.get(j)).cancel(false);
                }
                if (e instanceof InterruptedException) {
                    throw new IgniteInterruptedException((InterruptedException)e);
                }
                if (e.getCause() instanceof IgniteException) {
                    throw (IgniteException)e.getCause();
                }
                throw new IgniteException(e.getCause());
            }
            catch (TimeoutException ignored) {
                if (U.currentTimeMillis() - l <= 60000L) continue;
                l = U.currentTimeMillis();
                this.log.warning("ValidateIndexesClosure is still running, processed " + this.completionCntr.get() + " of " + procPartFutures.size() + " local partitions");
            }
        }
        return hashMap;
    }

    private Future<Map<PartitionKey, ValidateIndexesPartitionResult>> processPartitionAsync(final CacheGroupContext grpCtx, final GridDhtLocalPartition part) {
        return this.calcExecutor.submit(new Callable<Map<PartitionKey, ValidateIndexesPartitionResult>>(){

            @Override
            public Map<PartitionKey, ValidateIndexesPartitionResult> call() throws Exception {
                return ValidateIndexesClosure.this.processPartition(grpCtx, part);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<PartitionKey, ValidateIndexesPartitionResult> processPartition(CacheGroupContext grpCtx, GridDhtLocalPartition part) {
        ValidateIndexesPartitionResult partRes;
        if (!part.reserve()) {
            return Collections.emptyMap();
        }
        try {
            if (part.state() != GridDhtPartitionState.OWNING) {
                Map<PartitionKey, ValidateIndexesPartitionResult> map = Collections.emptyMap();
                return map;
            }
            long updateCntrBefore = part.updateCounter();
            long partSize = part.dataStore().fullSize();
            GridIterator it = grpCtx.offheap().partitionIterator(part.id());
            Object consId = this.ignite.context().discovery().localNode().consistentId();
            boolean isPrimary = part.primary(grpCtx.topology().readyTopologyVersion());
            partRes = new ValidateIndexesPartitionResult(updateCntrBefore, partSize, isPrimary, consId);
            boolean enoughIssues = false;
            long keysProcessed = 0L;
            long lastProgressLog = U.currentTimeMillis();
            while (it.hasNextX()) {
                GridCacheContext cacheCtx;
                if (enoughIssues) {
                    break;
                }
                CacheDataRow row = (CacheDataRow)it.nextX();
                int cacheId = row.cacheId() == 0 ? grpCtx.groupId() : row.cacheId();
                GridCacheContext gridCacheContext = cacheCtx = row.cacheId() == 0 ? grpCtx.singleCacheContext() : grpCtx.shared().cacheContext(row.cacheId());
                if (cacheCtx == null) {
                    throw new IgniteException("Unknown cacheId of CacheDataRow: " + cacheId);
                }
                GridQueryProcessor qryProcessor = this.ignite.context().query();
                try {
                    IgniteH2Indexing indexing;
                    GridH2Table gridH2Tbl;
                    Method m = GridQueryProcessor.class.getDeclaredMethod("typeByValue", String.class, CacheObjectContext.class, KeyCacheObject.class, CacheObject.class, Boolean.TYPE);
                    m.setAccessible(true);
                    QueryTypeDescriptorImpl res = (QueryTypeDescriptorImpl)m.invoke((Object)qryProcessor, cacheCtx.name(), cacheCtx.cacheObjectContext(), row.key(), row.value(), true);
                    if (res == null || (gridH2Tbl = (indexing = (IgniteH2Indexing)qryProcessor.getIndexing()).dataTable(cacheCtx.name(), res.tableName())) == null) continue;
                    GridH2RowDescriptor gridH2RowDesc = gridH2Tbl.rowDescriptor();
                    GridH2Row h2Row = gridH2RowDesc.createRow(row);
                    ArrayList<Index> indexes = gridH2Tbl.getIndexes();
                    for (Index idx : indexes) {
                        try {
                            Cursor cursor = idx.find((Session)null, (SearchRow)h2Row, (SearchRow)h2Row);
                            if (cursor != null && cursor.next()) continue;
                            throw new IgniteCheckedException("Key not found.");
                        }
                        catch (Throwable t) {
                            Object o = CacheObjectUtils.unwrapBinaryIfNeeded((CacheObjectValueContext)grpCtx.cacheObjectContext(), (CacheObject)row.key(), (boolean)true, (boolean)true);
                            IndexValidationIssue is = new IndexValidationIssue(o.toString(), cacheCtx.name(), idx.getName(), t);
                            this.log.error("Failed to lookup key: " + is.toString());
                            enoughIssues |= partRes.reportIssue(is);
                        }
                    }
                }
                catch (IllegalAccessException | NoSuchMethodException e) {
                    this.log.error("Failed to invoke typeByValue", (Throwable)e);
                    throw new IgniteException((Throwable)e);
                }
                catch (InvocationTargetException e) {
                    Throwable target = e.getTargetException();
                    this.log.error("Failed to invoke typeByValue", target);
                    throw new IgniteException(target);
                }
                finally {
                    ++keysProcessed;
                    if (U.currentTimeMillis() - lastProgressLog < 60000L || partSize <= 0L) continue;
                    this.log.warning("Processing partition " + part.id() + " (" + keysProcessed * 100L / partSize + "% " + keysProcessed + "/" + partSize + ")");
                    lastProgressLog = U.currentTimeMillis();
                }
            }
        }
        catch (IgniteCheckedException e) {
            U.error((IgniteLogger)this.log, (Object)("Failed to process partition [grpId=" + grpCtx.groupId() + ", partId=" + part.id() + "]"), (Throwable)e);
            Map<PartitionKey, ValidateIndexesPartitionResult> map = Collections.emptyMap();
            return map;
        }
        finally {
            part.release();
        }
        PartitionKey partKey = new PartitionKey(grpCtx.groupId(), part.id(), grpCtx.cacheOrGroupName());
        this.completionCntr.incrementAndGet();
        return Collections.singletonMap(partKey, partRes);
    }
}

