/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.BPServiceActor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataXceiverServer;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;

@InterfaceAudience.Private
class BPOfferService {
    static final Log LOG = DataNode.LOG;
    NamespaceInfo bpNSInfo;
    volatile DatanodeRegistration bpRegistration;
    private final DataNode dn;
    private BPServiceActor bpServiceToActive = null;
    private final List<BPServiceActor> bpServices = new CopyOnWriteArrayList<BPServiceActor>();
    private long lastActiveClaimTxId = -1L;

    BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
        Preconditions.checkArgument((!nnAddrs.isEmpty() ? 1 : 0) != 0, (Object)"Must pass at least one NN.");
        this.dn = dn;
        for (InetSocketAddress addr : nnAddrs) {
            this.bpServices.add(new BPServiceActor(addr, this));
        }
    }

    void refreshNNList(ArrayList<InetSocketAddress> addrs) throws IOException {
        HashSet oldAddrs = Sets.newHashSet();
        for (BPServiceActor actor : this.bpServices) {
            oldAddrs.add(actor.getNNSocketAddress());
        }
        HashSet newAddrs = Sets.newHashSet(addrs);
        if (!Sets.symmetricDifference((Set)oldAddrs, (Set)newAddrs).isEmpty()) {
            throw new IOException("HA does not currently support adding a new standby to a running DN. Please do a rolling restart of DNs to reconfigure the list of NNs.");
        }
    }

    boolean isInitialized() {
        return this.bpRegistration != null;
    }

    boolean isAlive() {
        for (BPServiceActor actor : this.bpServices) {
            if (!actor.isAlive()) continue;
            return true;
        }
        return false;
    }

    synchronized String getBlockPoolId() {
        if (this.bpNSInfo != null) {
            return this.bpNSInfo.getBlockPoolID();
        }
        LOG.warn((Object)"Block pool ID needed, but service not yet registered with NN", (Throwable)new Exception("trace"));
        return null;
    }

    boolean hasBlockPoolId() {
        return this.getNamespaceInfo() != null;
    }

    synchronized NamespaceInfo getNamespaceInfo() {
        return this.bpNSInfo;
    }

    public synchronized String toString() {
        if (this.bpNSInfo == null) {
            String datanodeUuid = this.dn.getDatanodeUuid();
            if (datanodeUuid == null || datanodeUuid.isEmpty()) {
                datanodeUuid = "unassigned";
            }
            return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
        }
        return "Block pool " + this.getBlockPoolId() + " (Datanode Uuid " + this.dn.getDatanodeUuid() + ")";
    }

    void reportBadBlocks(ExtendedBlock block, String storageUuid, StorageType storageType) {
        this.checkBlock(block);
        for (BPServiceActor actor : this.bpServices) {
            actor.reportBadBlocks(block, storageUuid, storageType);
        }
    }

    void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint, String storageUuid) {
        this.checkBlock(block);
        this.checkDelHint(delHint);
        ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, delHint);
        for (BPServiceActor actor : this.bpServices) {
            actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
        }
    }

    private void checkBlock(ExtendedBlock block) {
        Preconditions.checkArgument((block != null ? 1 : 0) != 0, (Object)"block is null");
        Preconditions.checkArgument((boolean)block.getBlockPoolId().equals(this.getBlockPoolId()), (String)"block belongs to BP %s instead of BP %s", (Object[])new Object[]{block.getBlockPoolId(), this.getBlockPoolId()});
    }

    private void checkDelHint(String delHint) {
        Preconditions.checkArgument((delHint != null ? 1 : 0) != 0, (Object)"delHint is null");
    }

    void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
        this.checkBlock(block);
        ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.DELETED_BLOCK, null);
        for (BPServiceActor actor : this.bpServices) {
            actor.notifyNamenodeDeletedBlock(bInfo, storageUuid);
        }
    }

    void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
        this.checkBlock(block);
        ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(block.getLocalBlock(), ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, null);
        for (BPServiceActor actor : this.bpServices) {
            actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
        }
    }

    void start() {
        for (BPServiceActor actor : this.bpServices) {
            actor.start();
        }
    }

    void stop() {
        for (BPServiceActor actor : this.bpServices) {
            actor.stop();
        }
    }

    void join() {
        for (BPServiceActor actor : this.bpServices) {
            actor.join();
        }
    }

    DataNode getDataNode() {
        return this.dn;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
        if (this.bpNSInfo == null) {
            this.bpNSInfo = nsInfo;
            boolean success = false;
            try {
                this.dn.initBlockPool(this);
                success = true;
            }
            finally {
                if (!success) {
                    this.bpNSInfo = null;
                }
            }
        } else {
            BPOfferService.checkNSEquality(this.bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), "Blockpool ID");
            BPOfferService.checkNSEquality(this.bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(), "Namespace ID");
            BPOfferService.checkNSEquality(this.bpNSInfo.getClusterID(), nsInfo.getClusterID(), "Cluster ID");
        }
    }

    synchronized void registrationSucceeded(BPServiceActor bpServiceActor, DatanodeRegistration reg) throws IOException {
        if (this.bpRegistration != null) {
            BPOfferService.checkNSEquality(this.bpRegistration.getStorageInfo().getNamespaceID(), reg.getStorageInfo().getNamespaceID(), "namespace ID");
            BPOfferService.checkNSEquality(this.bpRegistration.getStorageInfo().getClusterID(), reg.getStorageInfo().getClusterID(), "cluster ID");
        } else {
            this.bpRegistration = reg;
        }
        this.dn.bpRegistrationSucceeded(this.bpRegistration, this.getBlockPoolId());
        if (this.dn.isBlockTokenEnabled) {
            this.dn.blockPoolTokenSecretManager.addKeys(this.getBlockPoolId(), reg.getExportedKeys());
        }
    }

    private static void checkNSEquality(Object ourID, Object theirID, String idHelpText) throws IOException {
        if (!ourID.equals(theirID)) {
            throw new IOException(idHelpText + " mismatch: " + "previously connected to " + idHelpText + " " + ourID + " but now connected to " + idHelpText + " " + theirID);
        }
    }

    synchronized DatanodeRegistration createRegistration() {
        Preconditions.checkState((this.bpNSInfo != null ? 1 : 0) != 0, (Object)"getRegistration() can only be called after initial handshake");
        return this.dn.createBPRegistration(this.bpNSInfo);
    }

    synchronized void shutdownActor(BPServiceActor actor) {
        if (this.bpServiceToActive == actor) {
            this.bpServiceToActive = null;
        }
        this.bpServices.remove(actor);
        if (this.bpServices.isEmpty()) {
            this.dn.shutdownBlockPool(this);
        }
    }

    void trySendErrorReport(int errCode, String errMsg) {
        for (BPServiceActor actor : this.bpServices) {
            actor.trySendErrorReport(errCode, errMsg);
        }
    }

    void scheduleBlockReport(long delay) {
        for (BPServiceActor actor : this.bpServices) {
            actor.scheduleBlockReport(delay);
        }
    }

    void reportRemoteBadBlock(DatanodeInfo dnInfo, ExtendedBlock block) {
        for (BPServiceActor actor : this.bpServices) {
            try {
                actor.reportRemoteBadBlock(dnInfo, block);
            }
            catch (IOException e) {
                LOG.warn((Object)("Couldn't report bad block " + block + " to " + actor), (Throwable)e);
            }
        }
    }

    synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() {
        if (this.bpServiceToActive != null) {
            return this.bpServiceToActive.bpNamenode;
        }
        return null;
    }

    @VisibleForTesting
    List<BPServiceActor> getBPServiceActors() {
        return Lists.newArrayList(this.bpServices);
    }

    void signalRollingUpgrade(boolean inProgress) {
        if (inProgress) {
            this.dn.getFSDataset().enableTrash(this.getBlockPoolId());
        } else {
            this.dn.getFSDataset().restoreTrash(this.getBlockPoolId());
        }
    }

    synchronized void updateActorStatesFromHeartbeat(BPServiceActor actor, NNHAStatusHeartbeat nnHaState) {
        boolean isMoreRecentClaim;
        long txid = nnHaState.getTxId();
        boolean nnClaimsActive = nnHaState.getState() == HAServiceProtocol.HAServiceState.ACTIVE;
        boolean bposThinksActive = this.bpServiceToActive == actor;
        boolean bl = isMoreRecentClaim = txid > this.lastActiveClaimTxId;
        if (nnClaimsActive && !bposThinksActive) {
            LOG.info((Object)("Namenode " + actor + " trying to claim ACTIVE state with " + "txid=" + txid));
            if (!isMoreRecentClaim) {
                LOG.warn((Object)("NN " + actor + " tried to claim ACTIVE state at txid=" + txid + " but there was already a more recent claim at txid=" + this.lastActiveClaimTxId));
                return;
            }
            if (this.bpServiceToActive == null) {
                LOG.info((Object)("Acknowledging ACTIVE Namenode " + actor));
            } else {
                LOG.info((Object)("Namenode " + actor + " taking over ACTIVE state from " + this.bpServiceToActive + " at higher txid=" + txid));
            }
            this.bpServiceToActive = actor;
        } else if (!nnClaimsActive && bposThinksActive) {
            LOG.info((Object)("Namenode " + actor + " relinquishing ACTIVE state with " + "txid=" + nnHaState.getTxId()));
            this.bpServiceToActive = null;
        }
        if (this.bpServiceToActive == actor) {
            assert (txid >= this.lastActiveClaimTxId);
            this.lastActiveClaimTxId = txid;
        }
    }

    boolean containsNN(InetSocketAddress addr) {
        for (BPServiceActor actor : this.bpServices) {
            if (!actor.getNNSocketAddress().equals(addr)) continue;
            return true;
        }
        return false;
    }

    @VisibleForTesting
    int countNameNodes() {
        return this.bpServices.size();
    }

    @VisibleForTesting
    void triggerBlockReportForTests() throws IOException {
        for (BPServiceActor actor : this.bpServices) {
            actor.triggerBlockReportForTests();
        }
    }

    @VisibleForTesting
    void triggerDeletionReportForTests() throws IOException {
        for (BPServiceActor actor : this.bpServices) {
            actor.triggerDeletionReportForTests();
        }
    }

    @VisibleForTesting
    void triggerHeartbeatForTests() throws IOException {
        for (BPServiceActor actor : this.bpServices) {
            actor.triggerHeartbeatForTests();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean processCommandFromActor(DatanodeCommand cmd, BPServiceActor actor) throws IOException {
        assert (this.bpServices.contains(actor));
        if (cmd == null) {
            return true;
        }
        if (4 == cmd.getAction()) {
            LOG.info((Object)("DatanodeCommand action : DNA_REGISTER from " + actor.nnAddr + " with " + actor.state + " state"));
            actor.reRegister();
            return true;
        }
        BPOfferService bPOfferService = this;
        synchronized (bPOfferService) {
            if (actor == this.bpServiceToActive) {
                return this.processCommandFromActive(cmd, actor);
            }
            return this.processCommandFromStandby(cmd, actor);
        }
    }

    private String blockIdArrayToString(long[] ids) {
        long maxNumberOfBlocksToLog = this.dn.getMaxNumberOfBlocksToLog();
        StringBuilder bld = new StringBuilder();
        String prefix = "";
        for (int i = 0; i < ids.length; ++i) {
            if ((long)i >= maxNumberOfBlocksToLog) {
                bld.append("...");
                break;
            }
            bld.append(prefix).append(ids[i]);
            prefix = ", ";
        }
        return bld.toString();
    }

    private boolean processCommandFromActive(DatanodeCommand cmd, BPServiceActor actor) throws IOException {
        BlockCommand bcmd = cmd instanceof BlockCommand ? (BlockCommand)cmd : null;
        BlockIdCommand blockIdCmd = cmd instanceof BlockIdCommand ? (BlockIdCommand)cmd : null;
        switch (cmd.getAction()) {
            case 1: {
                this.dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
                this.dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
                break;
            }
            case 2: {
                Block[] toDelete = bcmd.getBlocks();
                if (this.dn.blockScanner != null) {
                    this.dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
                }
                this.dn.getFSDataset().invalidate(bcmd.getBlockPoolId(), toDelete);
                this.dn.metrics.incrBlocksRemoved(toDelete.length);
                break;
            }
            case 9: {
                LOG.info((Object)("DatanodeCommand action: DNA_CACHE for " + blockIdCmd.getBlockPoolId() + " of [" + this.blockIdArrayToString(blockIdCmd.getBlockIds()) + "]"));
                this.dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
                break;
            }
            case 10: {
                LOG.info((Object)("DatanodeCommand action: DNA_UNCACHE for " + blockIdCmd.getBlockPoolId() + " of [" + this.blockIdArrayToString(blockIdCmd.getBlockIds()) + "]"));
                this.dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
                break;
            }
            case 3: {
                throw new UnsupportedOperationException("Received unimplemented DNA_SHUTDOWN");
            }
            case 5: {
                String bp = ((FinalizeCommand)cmd).getBlockPoolId();
                LOG.info((Object)("Got finalize command for block pool " + bp));
                assert (this.getBlockPoolId().equals(bp)) : "BP " + this.getBlockPoolId() + " received DNA_FINALIZE " + "for other block pool " + bp;
                this.dn.finalizeUpgradeForPool(bp);
                break;
            }
            case 6: {
                String who = "NameNode at " + actor.getNNSocketAddress();
                this.dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks());
                break;
            }
            case 7: {
                LOG.info((Object)"DatanodeCommand action: DNA_ACCESSKEYUPDATE");
                if (!this.dn.isBlockTokenEnabled) break;
                this.dn.blockPoolTokenSecretManager.addKeys(this.getBlockPoolId(), ((KeyUpdateCommand)cmd).getExportedKeys());
                break;
            }
            case 8: {
                LOG.info((Object)"DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");
                long bandwidth = ((BalancerBandwidthCommand)cmd).getBalancerBandwidthValue();
                if (bandwidth <= 0L) break;
                DataXceiverServer dxcs = (DataXceiverServer)this.dn.dataXceiverServer.getRunnable();
                LOG.info((Object)("Updating balance throttler bandwidth from " + dxcs.balanceThrottler.getBandwidth() + " bytes/s " + "to: " + bandwidth + " bytes/s."));
                dxcs.balanceThrottler.setBandwidth(bandwidth);
                break;
            }
            default: {
                LOG.warn((Object)("Unknown DatanodeCommand action: " + cmd.getAction()));
            }
        }
        return true;
    }

    private boolean processCommandFromStandby(DatanodeCommand cmd, BPServiceActor actor) throws IOException {
        switch (cmd.getAction()) {
            case 7: {
                LOG.info((Object)"DatanodeCommand action from standby: DNA_ACCESSKEYUPDATE");
                if (!this.dn.isBlockTokenEnabled) break;
                this.dn.blockPoolTokenSecretManager.addKeys(this.getBlockPoolId(), ((KeyUpdateCommand)cmd).getExportedKeys());
                break;
            }
            case 1: 
            case 2: 
            case 3: 
            case 5: 
            case 6: 
            case 8: 
            case 9: 
            case 10: {
                LOG.warn((Object)("Got a command from standby NN - ignoring command:" + cmd.getAction()));
                break;
            }
            default: {
                LOG.warn((Object)("Unknown DatanodeCommand action: " + cmd.getAction()));
            }
        }
        return true;
    }

    boolean shouldRetryInit() {
        if (this.hasBlockPoolId()) {
            return true;
        }
        return this.isAlive();
    }
}

