/*
 * Decompiled with CFR 0.152.
 */
package edu.sysu.pmglab.pconsumer;

import edu.sysu.pmglab.executor.ThreadQueue;
import edu.sysu.pmglab.pconsumer.Node;
import edu.sysu.pmglab.pconsumer.TaskConsumer;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public abstract class POrderedConsumer<I, O> {
    final ThreadQueue queue;
    private final Function<O, AtomicInteger> COUNTER = o -> new AtomicInteger(0);
    private final BlockingQueue<Node<I, O>> taskQueue = new LinkedBlockingQueue<Node<I, O>>();
    private final BlockingQueue<Node<I, O>> containerQueue = new LinkedBlockingQueue<Node<I, O>>();
    private final ConcurrentHashMap<O, AtomicInteger> taskIds = new ConcurrentHashMap();
    private final ConcurrentHashMap<O, AtomicInteger> writeIds = new ConcurrentHashMap();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);

    public POrderedConsumer(int threads) {
        this.queue = new ThreadQueue(threads);
    }

    public <T extends POrderedConsumer<I, O>> T start() {
        for (int i = 0; i < this.queue.getThreads(); ++i) {
            this.containerQueue.add(this.initNode());
            this.queue.addTask((status, context) -> {
                Node<I, O> task;
                TaskConsumer<I, O> processor = this.initProcessor();
                while (true) {
                    AtomicInteger writeId;
                    O o;
                    if ((task = this.taskQueue.take()).isStop()) break;
                    if (task.isTask()) {
                        processor.process(task.getInput());
                        o = task.getOutput();
                        synchronized (o) {
                            writeId = this.writeIds.computeIfAbsent(task.getOutput(), this.COUNTER);
                            while (writeId.get() != task.getTaskId()) {
                                task.getOutput().wait();
                            }
                            processor.write(task.getOutput());
                            writeId.incrementAndGet();
                            task.getOutput().notifyAll();
                        }
                    }
                    if (task.isClose()) {
                        o = task.getOutput();
                        synchronized (o) {
                            writeId = this.writeIds.computeIfAbsent(task.getOutput(), this.COUNTER);
                            while (writeId.get() != task.getTaskId()) {
                                task.getOutput().wait();
                            }
                            processor.close(task.getOutput());
                            writeId.incrementAndGet();
                            task.getOutput().notifyAll();
                        }
                    }
                    this.containerQueue.put(task);
                }
                this.containerQueue.put(task);
                processor.stop();
            });
        }
        return (T)this;
    }

    public abstract Node<I, O> initNode();

    public abstract TaskConsumer<I, O> initProcessor();

    public final void submit(I bytes, O output) throws InterruptedException {
        if (this.isClosed.get()) {
            throw new IllegalStateException("Encoder is already closed");
        }
        int taskId = this.taskIds.computeIfAbsent(output, this.COUNTER).getAndIncrement();
        Node<I, O> node = this.containerQueue.take();
        node.update(bytes, output, taskId);
        this.taskQueue.put(node);
    }

    public final void close(O output) throws InterruptedException {
        if (this.isClosed.get()) {
            throw new IllegalStateException("Encoder is already closed");
        }
        int taskId = this.taskIds.computeIfAbsent(output, this.COUNTER).getAndIncrement();
        Node<I, O> node = this.containerQueue.take();
        node.close(output, taskId);
        this.taskQueue.put(node);
    }

    public final void stop() throws IOException {
        try {
            if (this.isClosed.compareAndSet(false, true)) {
                for (int i = 0; i < this.queue.getThreads(); ++i) {
                    Node<I, O> node = this.containerQueue.take();
                    node.stop();
                    this.taskQueue.put(node);
                }
                this.queue.close();
                this.containerQueue.clear();
                this.taskQueue.clear();
                this.taskIds.clear();
                this.writeIds.clear();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException(e);
        }
    }
}

