/*
 * Decompiled with CFR 0.152.
 */
package edu.sysu.pmglab.ccf.toolkit;

import edu.sysu.pmglab.RuntimeProperty;
import edu.sysu.pmglab.ccf.toolkit.converter.IConverter;
import edu.sysu.pmglab.ccf.toolkit.converter.ILiteConverter;
import edu.sysu.pmglab.ccf.toolkit.input.InputOption;
import edu.sysu.pmglab.ccf.toolkit.input.InputProducer;
import edu.sysu.pmglab.ccf.toolkit.listener.IListener;
import edu.sysu.pmglab.ccf.toolkit.output.CustomBridge;
import edu.sysu.pmglab.ccf.toolkit.output.CustomOutput;
import edu.sysu.pmglab.ccf.toolkit.output.OutputConsumer;
import edu.sysu.pmglab.ccf.toolkit.output.OutputOption;
import edu.sysu.pmglab.container.list.List;
import edu.sysu.pmglab.executor.ThreadQueue;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class Processor<I, T extends InputOption<I, ?, ?>, O, K extends OutputOption<O, ?>> {
    final List<T> sources;
    final IConverter<I, T, O, K> c;
    final ILiteConverter<I, O> lc;
    final K target;
    IListener<List<T>, K> listener = IListener.EMPTY;

    private Processor(List<T> sources, IConverter<I, T, O, K> c, ILiteConverter<I, O> lc, K target) {
        this.sources = sources;
        this.c = c;
        this.lc = lc;
        this.target = target;
    }

    public static <I, T extends InputOption<I, ?, ?>> InputSetting<I, T> init() {
        return new InputSetting(List.EMPTY());
    }

    public static <I, T extends InputOption<I, ?, ?>> InputSetting<I, T> setInput(T input) {
        return new InputSetting(List.singleton(input));
    }

    public static <I, T extends InputOption<I, ?, ?>> InputSetting<I, T> setInputs(T[] inputs) {
        return new InputSetting(List.wrap(inputs));
    }

    public static <I, T extends InputOption<I, ?, ?>> InputSetting<I, T> setInputs(Iterable<T> inputs) {
        return new InputSetting(inputs);
    }

    public Processor<I, T, O, K> setListener(IListener<List<T>, K> listener) {
        this.listener = listener == null ? IListener.EMPTY : listener;
        return this;
    }

    public void submit(int threads) throws IOException {
        threads = RuntimeProperty.verifyThreads(threads);
        this.listener.start(this.sources, this.target);
        AtomicLong reads = new AtomicLong();
        AtomicLong writes = new AtomicLong();
        if (this.sources.size() == 0) {
            OutputConsumer writers = this.target.getWriters(0);
            writers.close();
        } else {
            OutputConsumer writers = this.target.getWriters(threads == 1 ? 1 : threads * this.sources.size());
            if (writers.numOfParts() == 1) {
                for (InputOption source2 : this.sources) {
                    Object input;
                    if (source2.numOfRecords() == 0L) continue;
                    InputProducer reader = source2.getReaders(1).fastGet(0);
                    List<InputOption> wrapper = List.singleton(source2);
                    long readInCurrentThread = 0L;
                    long writeInCurrentThread = 0L;
                    while ((input = reader.read()) != null) {
                        int count;
                        if (this.c == null) {
                            O output = this.lc.converter(input);
                            count = writers.write(0, output);
                        } else {
                            long pointer = reader.tell();
                            Iterable<O> outputs = this.c.converter(input, source2, pointer == -1L ? -1L : pointer - 1L, this.target);
                            count = writers.write(0, outputs);
                        }
                        this.listener.step(wrapper, this.target, 1L, count);
                        ++readInCurrentThread;
                        writeInCurrentThread += (long)count;
                    }
                    reader.close();
                    reads.addAndGet(readInCurrentThread);
                    writes.addAndGet(writeInCurrentThread);
                }
            } else {
                try (ThreadQueue queue = new ThreadQueue(threads);){
                    AtomicInteger indexes = new AtomicInteger();
                    for (InputOption source3 : this.sources) {
                        if (source3.numOfRecords() == 0L) continue;
                        List readers = source3.getReaders(threads);
                        for (InputProducer reader : readers) {
                            int index = indexes.getAndAdd(1);
                            queue.addTask((status, context) -> {
                                Object input;
                                List<InputOption> wrapper = List.singleton(source3);
                                long readInCurrentThread = 0L;
                                long writeInCurrentThread = 0L;
                                while ((input = reader.read()) != null) {
                                    int count;
                                    if (this.c == null) {
                                        O output = this.lc.converter(input);
                                        count = writers.write(index, output);
                                    } else {
                                        long pointer = reader.tell();
                                        Iterable<O> outputs = this.c.converter(input, source3, pointer == -1L ? -1L : pointer - 1L, this.target);
                                        count = writers.write(index, outputs);
                                    }
                                    this.listener.step(wrapper, this.target, 1L, count);
                                    ++readInCurrentThread;
                                    writeInCurrentThread += (long)count;
                                }
                                reader.close();
                                writers.finish(index);
                                reads.addAndGet(readInCurrentThread);
                                writes.addAndGet(writeInCurrentThread);
                            });
                        }
                    }
                }
            }
            writers.close();
        }
        this.listener.stop(this.sources, this.target, reads.get(), writes.get());
    }

    /* synthetic */ Processor(List x0, IConverter x1, ILiteConverter x2, OutputOption x3, 1 x4) {
        this(x0, x1, x2, x3);
    }

    public static class OutputSetting<I, T extends InputOption<I, ?, ?>, O, K extends OutputOption<O, ?>> {
        final List<T> inputs;
        final K output;

        private OutputSetting(List<T> inputs, K output) {
            this.inputs = inputs;
            this.output = output;
        }

        public OutputSetting<I, T, O, K> configureOption(CustomOutput<T, K> consumer) throws IOException {
            if (consumer != null) {
                consumer.accept(this.inputs, this.output);
            }
            return this;
        }

        public Processor<I, T, O, K> bridge(IConverter<I, T, O, K> converter) {
            if (converter == null) {
                throw new IllegalArgumentException("Invalid output option: null");
            }
            return new Processor(this.inputs, converter, null, (OutputOption)this.output, null);
        }

        public Processor<I, T, O, K> bridge(ILiteConverter<I, O> converter) {
            if (converter == null) {
                throw new IllegalArgumentException("Invalid output option: null");
            }
            return new Processor(this.inputs, null, converter, (OutputOption)this.output, null);
        }

        public Processor<I, T, O, K> bridge(CustomBridge<I, T, O, K> consumer) throws IOException {
            if (consumer == null) {
                throw new IllegalArgumentException("Invalid output option: null");
            }
            return new Processor(this.inputs, consumer.instance(this.inputs, this.output), null, (OutputOption)this.output, null);
        }

        /* synthetic */ OutputSetting(List x0, OutputOption x1, 1 x2) {
            this(x0, x1);
        }
    }

    public static class InputSetting<I, T extends InputOption<I, ?, ?>> {
        final List<T> inputs = new List();

        private InputSetting(Iterable<T> inputs) {
            if (inputs != null) {
                for (InputOption input : inputs) {
                    if (input == null) continue;
                    this.inputs.add(input);
                }
            }
        }

        public InputSetting<I, T> addInput(T input) {
            if (input != null) {
                this.inputs.add(input);
            }
            return this;
        }

        public InputSetting<I, T> addInputs(T[] inputs) {
            if (inputs != null) {
                for (T input : inputs) {
                    if (input == null) continue;
                    this.inputs.add(input);
                }
            }
            return this;
        }

        public InputSetting<I, T> addInputs(Iterable<T> inputs) {
            if (inputs != null) {
                for (InputOption input : inputs) {
                    if (input == null) continue;
                    this.inputs.add(input);
                }
            }
            return this;
        }

        public <O, K extends OutputOption<O, ?>> OutputSetting<I, T, O, K> setOutput(K output) {
            if (output == null) {
                throw new IllegalArgumentException("Invalid output option: null");
            }
            return new OutputSetting(this.inputs.asUnmodifiable(), output, null);
        }

        public <O, K extends OutputOption<O, ?>> OutputSetting<I, T, O, K> setOutput(K output, CustomOutput<T, K> consumer) throws IOException {
            if (output == null) {
                throw new IllegalArgumentException("Invalid output option: null");
            }
            if (consumer != null) {
                consumer.accept(this.inputs.asUnmodifiable(), output);
            }
            return new OutputSetting(this.inputs.asUnmodifiable(), output, null);
        }
    }
}

