/*
 * Decompiled with CFR 0.152.
 */
package org.apache.accumulo.test.functional;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.test.functional.FunctionalTest;
import org.apache.hadoop.io.Text;

public class BatchWriterFlushTest
extends FunctionalTest {
    private static final int NUM_TO_FLUSH = 100000;

    @Override
    public void cleanup() throws Exception {
    }

    @Override
    public Map<String, String> getInitialConfig() {
        return Collections.emptyMap();
    }

    @Override
    public List<FunctionalTest.TableSetup> getTablesToCreate() {
        ArrayList<FunctionalTest.TableSetup> tables = new ArrayList<FunctionalTest.TableSetup>();
        tables.add(new FunctionalTest.TableSetup("bwft"));
        tables.add(new FunctionalTest.TableSetup("bwlt"));
        return tables;
    }

    @Override
    public void run() throws Exception {
        this.runFlushTest();
        this.runLatencyTest();
    }

    private void runLatencyTest() throws Exception {
        BatchWriter bw = this.getConnector().createBatchWriter("bwlt", new BatchWriterConfig().setMaxLatency(2000L, TimeUnit.MILLISECONDS));
        Scanner scanner = this.getConnector().createScanner("bwlt", Constants.NO_AUTHS);
        Mutation m = new Mutation(new Text(String.format("r_%10d", 1)));
        m.put(new Text("cf"), new Text("cq"), new Value("1".getBytes(Constants.UTF8)));
        bw.addMutation(m);
        UtilWaitThread.sleep((long)1000L);
        int count = 0;
        for (Map.Entry entry : scanner) {
            ++count;
        }
        if (count != 0) {
            throw new Exception("Flushed too soon");
        }
        UtilWaitThread.sleep((long)4000L);
        for (Map.Entry entry : scanner) {
            ++count;
        }
        if (count != 1) {
            throw new Exception("Did not flush");
        }
        bw.close();
    }

    private void runFlushTest() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException, Exception {
        BatchWriter bw = this.getConnector().createBatchWriter("bwft", new BatchWriterConfig());
        Scanner scanner = this.getConnector().createScanner("bwft", Constants.NO_AUTHS);
        Random r = new Random();
        for (int i = 0; i < 4; ++i) {
            Map.Entry entry;
            for (int j = 0; j < 100000; ++j) {
                int row = i * 100000 + j;
                Mutation m = new Mutation(new Text(String.format("r_%10d", row)));
                m.put(new Text("cf"), new Text("cq"), new Value(("" + row).getBytes()));
                bw.addMutation(m);
            }
            bw.flush();
            for (int k = 0; k < 10; ++k) {
                int rowToLookup = r.nextInt(100000) + i * 100000;
                scanner.setRange(new Range(new Text(String.format("r_%10d", rowToLookup))));
                Iterator iter = scanner.iterator();
                if (!iter.hasNext()) {
                    throw new Exception(" row " + rowToLookup + " not found after flush");
                }
                entry = (Map.Entry)iter.next();
                if (iter.hasNext()) {
                    throw new Exception("Scanner returned too much");
                }
                this.verifyEntry(rowToLookup, entry);
            }
            scanner.setRange(new Range(new Text(String.format("r_%10d", i * 100000)), true, new Text(String.format("r_%10d", (i + 1) * 100000)), false));
            Iterator iter = scanner.iterator();
            for (int j = 0; j < 100000; ++j) {
                int row = i * 100000 + j;
                if (!iter.hasNext()) {
                    throw new Exception("Scan stopped permaturely at " + row);
                }
                entry = (Map.Entry)iter.next();
                this.verifyEntry(row, entry);
            }
            if (!iter.hasNext()) continue;
            throw new Exception("Scanner returned too much");
        }
        bw.close();
        boolean caught = false;
        try {
            bw.addMutation(new Mutation(new Text("foobar")));
        }
        catch (IllegalStateException ise) {
            caught = true;
        }
        if (!caught) {
            throw new Exception("Adding to closed batch writer did not fail");
        }
    }

    private void verifyEntry(int row, Map.Entry<Key, Value> entry) throws Exception {
        if (!entry.getKey().getRow().toString().equals(String.format("r_%10d", row))) {
            throw new Exception("Unexpected key returned, expected " + row + " got " + entry.getKey());
        }
        if (!entry.getValue().toString().equals("" + row)) {
            throw new Exception("Unexpected value, expected " + row + " got " + entry.getValue());
        }
    }
}

