/*
 * Decompiled with CFR 0.152.
 */
package com.zillion.util.coprocessor;

import com.zillion.database.util.DatabaseImpl_HBase;
import com.zillion.util.common.HashObject;
import com.zillion.util.common.HashObjectWrapper;
import com.zillion.util.common.HashUtil;
import com.zillion.util.coprocessor.ObserverTool;
import com.zillion.util.table.SchemaRepository;
import com.zillion.util.table.ZillionTableIndex;
import com.zillion.util.table.ZillionTableSchema;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.zookeeper.CreateMode;
import org.zillion.util.common.HexToByte;
import org.zillion.util.log.LogUtil;

public class ObserverHashThread
extends Thread {
    boolean needStop = false;

    public void RequestStop() {
        this.needStop = true;
    }

    @Override
    public void run() {
        while (!ObserverTool.container.initialized) {
            try {
                Thread.sleep(1L);
            }
            catch (Exception e1) {
                e1.printStackTrace();
            }
        }
        do {
            try {
                Thread.sleep(1000L);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
            for (String namespace : ObserverTool.container.namespaceList) {
                this.Process_namespace(namespace);
            }
        } while (!this.needStop);
    }

    private void Process_namespace(String namespace) {
        try {
            SchemaRepository repository = ObserverTool.container.repositoryMap.get(namespace);
            List<String> tableNameList = repository.TableList();
            for (String tableName : tableNameList) {
                ZillionTableSchema schema = repository.GetTable(tableName);
                this.Process_one(namespace, tableName);
                for (ZillionTableIndex schema_index : schema.Indexes) {
                    String indexName = schema_index.index_name;
                    String tableName_tmp = String.valueOf(tableName) + "." + indexName;
                    this.Process_one(namespace, tableName_tmp);
                }
            }
            HTable table_meta_data = ((DatabaseImpl_HBase)ObserverTool.container.DatabaseImpl).Get_table_meta_data(namespace);
            HTable table_meta_index = ((DatabaseImpl_HBase)ObserverTool.container.DatabaseImpl).Get_table_meta_index(namespace);
            ObserverTool.MergeHash(ObserverTool.container.ZK, namespace, table_meta_data, table_meta_index, repository, true);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void Process_one(String namespace, String tableName_tmp) throws Exception {
        String zkpath_namespace = "/zillion/ns/" + namespace;
        String tablename_real = String.valueOf(namespace) + ":" + tableName_tmp;
        if (ObserverTool.hash_tmpMap.containsKey(tablename_real)) {
            String tableName = tableName_tmp;
            int index = tableName_tmp.indexOf(46);
            if (index != -1) {
                tableName = tableName_tmp.substring(0, index);
            }
            SchemaRepository repository = ObserverTool.container.repositoryMap.get(namespace);
            ZillionTableSchema schema = repository.GetTable(tableName);
            ConcurrentLinkedQueue queue = (ConcurrentLinkedQueue)ObserverTool.hash_tmpMap.get(tablename_real);
            if (schema.table_type != null && schema.table_type.startsWith("split_")) {
                HashObjectWrapper wrapper;
                HashMap<String, HashObject> hashObjectMap = new HashMap<String, HashObject>();
                while ((wrapper = (HashObjectWrapper)queue.poll()) != null) {
                    if (!hashObjectMap.containsKey(wrapper.child_name)) {
                        hashObjectMap.put(wrapper.child_name, new HashObject());
                    }
                    HashObject hashObject = (HashObject)hashObjectMap.get(wrapper.child_name);
                    LogUtil.debug(" **********************************  hash step 2 " + tablename_real + "/" + wrapper.child_name + "\t" + hashObject.insert_count + "+" + wrapper.hashObject.insert_count + "=" + (hashObject.insert_count + wrapper.hashObject.insert_count) + "\t" + hashObject.delete_count + "+" + wrapper.hashObject.delete_count + "=" + (hashObject.delete_count + wrapper.hashObject.delete_count));
                    hashObject.insert_count += wrapper.hashObject.insert_count;
                    hashObject.delete_count += wrapper.hashObject.delete_count;
                    hashObject.hash_code = HashUtil.merge_hash_code(hashObject.hash_code, wrapper.hashObject.hash_code);
                }
                Object[] child_nameArray = hashObjectMap.keySet().toArray(new String[0]);
                Arrays.sort(child_nameArray);
                Object[] objectArray = child_nameArray;
                int n = child_nameArray.length;
                int n2 = 0;
                while (n2 < n) {
                    Object child_name = objectArray[n2];
                    HashObject hashObject = (HashObject)hashObjectMap.get(child_name);
                    if (hashObject.insert_count > 0L || hashObject.delete_count > 0L) {
                        ObserverTool.container.ZK.create(String.valueOf(zkpath_namespace) + "/hash_tmp" + "/" + tableName_tmp + "/" + (String)child_name + "/" + "tmp-", hashObject.toBytes(), CreateMode.PERSISTENT_SEQUENTIAL);
                    }
                    ++n2;
                }
            } else {
                HashObjectWrapper wrapper;
                HashObject hashObject = new HashObject();
                while ((wrapper = (HashObjectWrapper)queue.poll()) != null) {
                    LogUtil.debug(" **********************************  hashmerge " + tablename_real);
                    LogUtil.debug(" ********************************** " + hashObject.insert_count + "\t" + hashObject.delete_count + "\t" + HexToByte.byteToHex(hashObject.hash_code));
                    LogUtil.debug(" ********************************** " + wrapper.hashObject.insert_count + "\t" + wrapper.hashObject.delete_count + "\t" + HexToByte.byteToHex(wrapper.hashObject.hash_code));
                    hashObject.insert_count += wrapper.hashObject.insert_count;
                    hashObject.delete_count += wrapper.hashObject.delete_count;
                    hashObject.hash_code = HashUtil.merge_hash_code(hashObject.hash_code, wrapper.hashObject.hash_code);
                    LogUtil.debug(" ********************************** " + hashObject.insert_count + "\t" + hashObject.delete_count + "\t" + HexToByte.byteToHex(hashObject.hash_code));
                }
                if (hashObject.insert_count > 0L || hashObject.delete_count > 0L) {
                    ObserverTool.container.ZK.create(String.valueOf(zkpath_namespace) + "/hash_tmp" + "/" + tableName_tmp + "/" + "tmp-", hashObject.toBytes(), CreateMode.PERSISTENT_SEQUENTIAL);
                }
            }
        }
    }
}

