package edu.sysu.pmglab.pconsumer;

import edu.sysu.pmglab.executor.ThreadQueue;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/* loaded from: input_file:edu/sysu/pmglab/pconsumer/POrderedConsumer.class */
public abstract class POrderedConsumer<I, O> {
    final ThreadQueue queue;
    private final Function<O, AtomicInteger> COUNTER = obj -> {
        return new AtomicInteger(0);
    };
    private final BlockingQueue<Node<I, O>> taskQueue = new LinkedBlockingQueue();
    private final BlockingQueue<Node<I, O>> containerQueue = new LinkedBlockingQueue();
    private final ConcurrentHashMap<O, AtomicInteger> taskIds = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<O, AtomicInteger> writeIds = new ConcurrentHashMap<>();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);

    public POrderedConsumer(int i) {
        this.queue = new ThreadQueue(i);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T extends POrderedConsumer<I, O>> T start() {
        for (int i = 0; i < this.queue.getThreads(); i++) {
            this.containerQueue.add(initNode());
            this.queue.addTask((status, context) -> {
                TaskConsumer<I, O> initProcessor = initProcessor();
                while (true) {
                    Node<I, O> take = this.taskQueue.take();
                    if (take.isStop()) {
                        this.containerQueue.put(take);
                        initProcessor.stop();
                        return;
                    }
                    if (take.isTask()) {
                        initProcessor.process(take.getInput());
                        synchronized (take.getOutput()) {
                            AtomicInteger atomicInteger = (AtomicInteger) this.writeIds.computeIfAbsent(take.getOutput(), this.COUNTER);
                            while (atomicInteger.get() != take.getTaskId()) {
                                take.getOutput().wait();
                            }
                            initProcessor.write(take.getOutput());
                            atomicInteger.incrementAndGet();
                            take.getOutput().notifyAll();
                        }
                    } else if (take.isClose()) {
                        synchronized (take.getOutput()) {
                            AtomicInteger atomicInteger2 = (AtomicInteger) this.writeIds.computeIfAbsent(take.getOutput(), this.COUNTER);
                            while (atomicInteger2.get() != take.getTaskId()) {
                                take.getOutput().wait();
                            }
                            initProcessor.close(take.getOutput());
                            atomicInteger2.incrementAndGet();
                            take.getOutput().notifyAll();
                        }
                    } else {
                        continue;
                    }
                    this.containerQueue.put(take);
                }
            });
        }
        return this;
    }

    public abstract Node<I, O> initNode();

    public abstract TaskConsumer<I, O> initProcessor();

    public final void submit(I i, O o) throws InterruptedException {
        if (this.isClosed.get()) {
            throw new IllegalStateException("Encoder is already closed");
        }
        int andIncrement = ((AtomicInteger) this.taskIds.computeIfAbsent(o, this.COUNTER)).getAndIncrement();
        Node<I, O> take = this.containerQueue.take();
        take.update(i, o, andIncrement);
        this.taskQueue.put(take);
    }

    public final void close(O o) throws InterruptedException {
        if (this.isClosed.get()) {
            throw new IllegalStateException("Encoder is already closed");
        }
        int andIncrement = ((AtomicInteger) this.taskIds.computeIfAbsent(o, this.COUNTER)).getAndIncrement();
        Node<I, O> take = this.containerQueue.take();
        take.close(o, andIncrement);
        this.taskQueue.put(take);
    }

    public final void stop() throws IOException {
        try {
            if (this.isClosed.compareAndSet(false, true)) {
                for (int i = 0; i < this.queue.getThreads(); i++) {
                    Node<I, O> take = this.containerQueue.take();
                    take.stop();
                    this.taskQueue.put(take);
                }
                this.queue.close();
                this.containerQueue.clear();
                this.taskQueue.clear();
                this.taskIds.clear();
                this.writeIds.clear();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException(e);
        }
    }
}
