/*
 * Decompiled with CFR 0.152.
 */
package com.zillion.util.coprocessor;

import com.zillion.database.util.DatabaseImpl_HBase;
import com.zillion.util.common.ByteToObject;
import com.zillion.util.common.HashObject;
import com.zillion.util.common.HashObjectWrapper;
import com.zillion.util.common.HashUtil;
import com.zillion.util.coprocessor.ObserverHashThread;
import com.zillion.util.coprocessor.ObserverMetaThread;
import com.zillion.util.coprocessor.StatTableWrapper;
import com.zillion.util.coprocessor.StatThread;
import com.zillion.util.table.SchemaRepository;
import com.zillion.util.table.SchemaRepositoryContainer;
import com.zillion.util.table.SchemaRepositoryHelper;
import com.zillion.util.table.ZillionTableIndex;
import com.zillion.util.table.ZillionTableSchema;
import com.zillion.util.zk.ZKLockRW;
import com.zillion.util.zk.ZKWrapper;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.zookeeper.data.Stat;
import org.zillion.util.format.FormatStaticUtil;
import org.zillion.util.log.LogUtil;

public class ObserverTool {
    public static long stat_interval = 300000L;
    public static long stat_lag = 60000L;
    public static SchemaRepositoryContainer container = new SchemaRepositoryContainer();
    public static boolean print = true;
    public static boolean observer = true;
    public static boolean started = false;
    public static ConcurrentMap<String, ConcurrentLinkedQueue<HashObjectWrapper>> hash_tmpMap = new ConcurrentHashMap<String, ConcurrentLinkedQueue<HashObjectWrapper>>();
    public static ConcurrentMap<String, StatTableWrapper> statMap = new ConcurrentHashMap<String, StatTableWrapper>();
    public static String full_name = "undefined";
    public static SchemaRepositoryHelper helper;

    static {
        try {
            Random rand = new Random();
            full_name = String.valueOf(InetAddress.getLocalHost().getHostAddress()) + ":" + rand.nextInt(65536);
            helper = new SchemaRepositoryHelper(full_name);
        }
        catch (UnknownHostException e) {
            e.printStackTrace();
        }
    }

    public static void MergeHash(ZKWrapper zookeeper, String namespace, HTable table_meta_data, HTable table_meta_index, SchemaRepository repository, boolean print) throws Exception {
        String method_name = "MergeHash";
        String zkpath_namespace = "/zillion/ns/" + namespace;
        List<String> tableNameList = repository.TableList();
        for (String tableName : tableNameList) {
            String parent_path_table = String.valueOf(zkpath_namespace) + "/hashlocks" + "/" + tableName;
            String lock_type_table = "w";
            String node_path_table = ZKLockRW.Create(zookeeper, parent_path_table, lock_type_table, full_name);
            if (print) {
                LogUtil.debug(" ********************************** " + method_name + " tablelock:" + " WaitFor " + node_path_table);
            }
            try {
                Stat stat;
                boolean wait_success = ZKLockRW.WaitOnce(zookeeper, parent_path_table, node_path_table);
                if (print) {
                    LogUtil.debug(" ********************************** " + method_name + " tablelock:" + " WaitFor " + wait_success + " " + node_path_table);
                }
                if (!wait_success) continue;
                ZillionTableSchema schema = repository.GetTable(tableName);
                if (schema.table_type != null && schema.table_type.startsWith("split_")) {
                    HTable table_meta_split = ((DatabaseImpl_HBase)ObserverTool.container.DatabaseImpl).Get_table_meta_split(namespace);
                    List<String> child_nameList = ObserverTool.container.ZK.getChildren(String.valueOf(zkpath_namespace) + "/hash_exist" + "/" + schema.table_name, false);
                    Collections.sort(child_nameList);
                    stat = new Stat();
                    byte[] hash_all = zookeeper.getData(String.valueOf(zkpath_namespace) + "/hash_exist" + "/" + tableName, false, stat);
                    boolean hash_all_change = false;
                    for (String child_name : child_nameList) {
                        stat = new Stat();
                        byte[] hash = zookeeper.getData(String.valueOf(zkpath_namespace) + "/hash_exist" + "/" + tableName + "/" + child_name, false, stat);
                        List<String> children = zookeeper.getChildren(String.valueOf(zkpath_namespace) + "/hash_tmp" + "/" + tableName + "/" + child_name, false);
                        if (children.size() <= 0) continue;
                        hash_all_change = true;
                        for (String child : children) {
                            stat = new Stat();
                            byte[] tmphash = zookeeper.getData(String.valueOf(zkpath_namespace) + "/hash_tmp" + "/" + tableName + "/" + child_name + "/" + child, false, stat);
                            hash = HashUtil.merge_hash(hash, tmphash);
                            hash_all = HashUtil.merge_hash(hash_all, tmphash);
                        }
                        zookeeper.setData(String.valueOf(zkpath_namespace) + "/hash_exist" + "/" + tableName + "/" + child_name, hash, -1);
                        HashObject hashObject = HashUtil.Parse(hash);
                        Put put_table = new Put(ByteToObject.toBytes(String.valueOf(tableName) + "_" + child_name));
                        put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hi"), ByteToObject.toBytes(FormatStaticUtil.toString(hashObject.insert_count)));
                        put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hd"), ByteToObject.toBytes(FormatStaticUtil.toString(hashObject.delete_count)));
                        put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hc"), hashObject.hash_code);
                        table_meta_split.put(put_table);
                        table_meta_split.flushCommits();
                        for (String child : children) {
                            zookeeper.delete(String.valueOf(zkpath_namespace) + "/hash_tmp" + "/" + tableName + "/" + child_name + "/" + child, -1);
                        }
                    }
                    if (hash_all_change) {
                        zookeeper.setData(String.valueOf(zkpath_namespace) + "/hash_exist" + "/" + tableName, hash_all, -1);
                        HashObject hashObject = HashUtil.Parse(hash_all);
                        Put put_table = new Put(ByteToObject.toBytes(tableName));
                        put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hi"), ByteToObject.toBytes(FormatStaticUtil.toString(hashObject.insert_count)));
                        put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hd"), ByteToObject.toBytes(FormatStaticUtil.toString(hashObject.delete_count)));
                        put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hc"), hashObject.hash_code);
                        table_meta_data.put(put_table);
                        table_meta_data.flushCommits();
                    }
                } else {
                    stat = new Stat();
                    byte[] hash = zookeeper.getData(String.valueOf(zkpath_namespace) + "/hash_exist" + "/" + tableName, false, stat);
                    List<String> children = zookeeper.getChildren(String.valueOf(zkpath_namespace) + "/hash_tmp" + "/" + tableName, false);
                    if (children.size() > 0) {
                        for (String child : children) {
                            stat = new Stat();
                            byte[] tmphash = zookeeper.getData(String.valueOf(zkpath_namespace) + "/hash_tmp" + "/" + tableName + "/" + child, false, stat);
                            hash = HashUtil.merge_hash(hash, tmphash);
                        }
                        zookeeper.setData(String.valueOf(zkpath_namespace) + "/hash_exist" + "/" + tableName, hash, -1);
                        HashObject hashObject = HashUtil.Parse(hash);
                        Put put_table = new Put(ByteToObject.toBytes(tableName));
                        put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hi"), ByteToObject.toBytes(FormatStaticUtil.toString(hashObject.insert_count)));
                        put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hd"), ByteToObject.toBytes(FormatStaticUtil.toString(hashObject.delete_count)));
                        put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hc"), hashObject.hash_code);
                        table_meta_data.put(put_table);
                        table_meta_data.flushCommits();
                        for (String child : children) {
                            zookeeper.delete(String.valueOf(zkpath_namespace) + "/hash_tmp" + "/" + tableName + "/" + child, -1);
                        }
                    }
                }
                int i = 0;
                while (i < schema.Indexes.size()) {
                    ZillionTableIndex index = schema.Indexes.get(i);
                    String indexName = index.index_name;
                    if (schema.table_type != null && schema.table_type.startsWith("split_")) {
                        HTable table_meta_split = ((DatabaseImpl_HBase)ObserverTool.container.DatabaseImpl).Get_table_meta_split(namespace);
                        List<String> child_nameList = ObserverTool.container.ZK.getChildren(String.valueOf(zkpath_namespace) + "/hash_exist" + "/" + tableName + "." + indexName, false);
                        Collections.sort(child_nameList);
                        stat = new Stat();
                        byte[] hash_all = zookeeper.getData(String.valueOf(zkpath_namespace) + "/hash_exist" + "/" + tableName + "." + indexName, false, stat);
                        boolean hash_all_change = false;
                        for (String child_name : child_nameList) {
                            stat = new Stat();
                            byte[] hash = zookeeper.getData(String.valueOf(zkpath_namespace) + "/hash_exist" + "/" + tableName + "." + indexName + "/" + child_name, false, stat);
                            List<String> children = zookeeper.getChildren(String.valueOf(zkpath_namespace) + "/hash_tmp" + "/" + tableName + "." + indexName + "/" + child_name, false);
                            if (children.size() <= 0) continue;
                            hash_all_change = true;
                            for (String child : children) {
                                stat = new Stat();
                                byte[] tmphash = zookeeper.getData(String.valueOf(zkpath_namespace) + "/hash_tmp" + "/" + tableName + "." + indexName + "/" + child_name + "/" + child, false, stat);
                                HashObject hash1 = HashUtil.Parse(hash);
                                HashObject hash2 = HashUtil.Parse(tmphash);
                                LogUtil.debug(" **********************************  hash step 3 " + tableName + "." + indexName + "/" + child_name + "\t" + hash1.insert_count + "+" + hash2.insert_count + "=" + (hash1.insert_count + hash2.insert_count) + "\t" + hash1.delete_count + "+" + hash2.delete_count + "=" + (hash1.delete_count + hash2.delete_count));
                                hash = HashUtil.merge_hash(hash, tmphash);
                                hash_all = HashUtil.merge_hash(hash_all, tmphash);
                            }
                            zookeeper.setData(String.valueOf(zkpath_namespace) + "/hash_exist" + "/" + tableName + "." + indexName + "/" + child_name, hash, -1);
                            HashObject hashObject = HashUtil.Parse(hash);
                            Put put_table = new Put(ByteToObject.toBytes(String.valueOf(tableName) + "." + indexName + "_" + child_name));
                            put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hi"), ByteToObject.toBytes(FormatStaticUtil.toString(hashObject.insert_count)));
                            put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hd"), ByteToObject.toBytes(FormatStaticUtil.toString(hashObject.delete_count)));
                            put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hc"), hashObject.hash_code);
                            table_meta_split.put(put_table);
                            table_meta_split.flushCommits();
                            for (String child : children) {
                                zookeeper.delete(String.valueOf(zkpath_namespace) + "/hash_tmp" + "/" + tableName + "." + indexName + "/" + child_name + "/" + child, -1);
                            }
                        }
                        if (hash_all_change) {
                            zookeeper.setData(String.valueOf(zkpath_namespace) + "/hash_exist" + "/" + tableName + "." + indexName, hash_all, -1);
                            HashObject hashObject = HashUtil.Parse(hash_all);
                            Put put_table = new Put(ByteToObject.toBytes(String.valueOf(tableName) + "." + indexName));
                            put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hi"), ByteToObject.toBytes(FormatStaticUtil.toString(hashObject.insert_count)));
                            put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hd"), ByteToObject.toBytes(FormatStaticUtil.toString(hashObject.delete_count)));
                            put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hc"), hashObject.hash_code);
                            table_meta_index.put(put_table);
                            table_meta_index.flushCommits();
                        }
                    } else {
                        stat = new Stat();
                        byte[] hash = zookeeper.getData(String.valueOf(zkpath_namespace) + "/hash_exist" + "/" + tableName + "." + indexName, false, stat);
                        List<String> children = zookeeper.getChildren(String.valueOf(zkpath_namespace) + "/hash_tmp" + "/" + tableName + "." + indexName, false);
                        if (children.size() > 0) {
                            for (String child : children) {
                                stat = new Stat();
                                byte[] tmphash = zookeeper.getData(String.valueOf(zkpath_namespace) + "/hash_tmp" + "/" + tableName + "." + indexName + "/" + child, false, stat);
                                hash = HashUtil.merge_hash(hash, tmphash);
                            }
                            zookeeper.setData(String.valueOf(zkpath_namespace) + "/hash_exist" + "/" + tableName + "." + indexName, hash, -1);
                            HashObject hashObject = HashUtil.Parse(hash);
                            Put put_table = new Put(ByteToObject.toBytes(String.valueOf(tableName) + "." + indexName));
                            put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hi"), ByteToObject.toBytes(FormatStaticUtil.toString(hashObject.insert_count)));
                            put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hd"), ByteToObject.toBytes(FormatStaticUtil.toString(hashObject.delete_count)));
                            put_table.addColumn(ByteToObject.toBytes("f"), ByteToObject.toBytes("hc"), hashObject.hash_code);
                            table_meta_index.put(put_table);
                            table_meta_index.flushCommits();
                            for (String child : children) {
                                zookeeper.delete(String.valueOf(zkpath_namespace) + "/hash_tmp" + "/" + tableName + "." + indexName + "/" + child, -1);
                            }
                        }
                    }
                    ++i;
                }
            }
            finally {
                ZKLockRW.Finish(zookeeper, node_path_table);
                if (print) {
                    LogUtil.debug(" ********************************** " + method_name + " tablelock:" + " finish " + node_path_table);
                }
            }
        }
    }

    public static synchronized void Start(Configuration configuration) {
        if (!started) {
            LogUtil.info(" ********************************** ObserverTool\tstart");
            ObserverMetaThread thread1 = new ObserverMetaThread(configuration);
            thread1.start();
            ObserverHashThread thread2 = new ObserverHashThread();
            thread2.start();
            StatThread StatThread2 = new StatThread();
            StatThread2.start();
            started = true;
        }
    }
}

