/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.rdf4j.federated.evaluation.concurrent;

import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
import org.eclipse.rdf4j.federated.evaluation.concurrent.FedXQueueCursor;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor;
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
import org.eclipse.rdf4j.federated.structures.QueryInfo;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.QueryInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ParallelExecutorBase<T>
extends LookAheadIteration<T>
implements ParallelExecutor<T> {
    protected static final Logger log = LoggerFactory.getLogger(ParallelExecutorBase.class);
    protected static final AtomicLong NEXT_EXECUTOR_ID = new AtomicLong(0L);
    protected final FederationEvalStrategy strategy;
    protected final long executorId;
    protected final QueryInfo queryInfo;
    protected volatile Thread evaluationThread;
    protected FedXQueueCursor<T> rightQueue = FedXQueueCursor.create(1024);
    protected volatile CloseableIteration<T> rightIter;
    protected volatile boolean finished = false;

    public ParallelExecutorBase(QueryInfo queryInfo) throws QueryEvaluationException {
        this.strategy = queryInfo.getStrategy();
        this.executorId = NEXT_EXECUTOR_ID.incrementAndGet();
        this.queryInfo = queryInfo;
    }

    @Override
    public final void run() {
        if (this.isClosed()) {
            return;
        }
        this.evaluationThread = Thread.currentThread();
        if (this.evaluationThread.isInterrupted()) {
            return;
        }
        if (log.isTraceEnabled()) {
            log.trace("Performing execution of " + this.getDisplayId() + ", thread: " + this.evaluationThread.getName());
        }
        try {
            this.performExecution();
            this.checkTimeout();
            if (log.isTraceEnabled()) {
                log.trace(this.getDisplayId() + " is finished.");
            }
            this.done();
        }
        catch (InterruptedException e) {
            this.toss(ExceptionUtil.toException(e));
            this.evaluationThread.interrupt();
        }
        catch (Throwable t) {
            this.toss(ExceptionUtil.toException(t));
        }
        finally {
            this.rightQueue.done();
            this.finished = true;
            this.evaluationThread = null;
        }
    }

    protected abstract void performExecution() throws Exception;

    @Override
    public void addResult(CloseableIteration<T> res) {
        try {
            if (res instanceof EmptyIteration) {
                return;
            }
            if (this.isClosed() || this.rightQueue.isClosed()) {
                res.close();
                return;
            }
            this.rightQueue.put(res);
            if (this.isClosed() || this.rightQueue.isClosed()) {
                res.close();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            res.close();
            throw new RuntimeException("Error adding element to right queue", e);
        }
    }

    @Override
    public void done() {
    }

    @Override
    public void toss(Exception e) {
        this.rightQueue.toss(e);
        if (log.isTraceEnabled()) {
            log.trace("Tossing exception of " + this.getDisplayId() + ": " + e.getMessage());
        }
    }

    @Override
    public T getNextElement() throws QueryEvaluationException {
        while (this.rightIter != null || this.rightQueue.hasNext()) {
            if (this.rightIter == null) {
                this.rightIter = (CloseableIteration)this.rightQueue.next();
            }
            if (this.rightIter.hasNext()) {
                return (T)this.rightIter.next();
            }
            this.rightIter.close();
            this.rightIter = null;
        }
        this.rightQueue.checkException();
        return null;
    }

    protected void checkTimeout() throws QueryInterruptedException {
        long maxTimeLeft = this.queryInfo.getMaxRemainingTimeMS();
        if (maxTimeLeft <= 0L) {
            throw new QueryInterruptedException("Query evaluation has run into a timeout");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handleClose() throws QueryEvaluationException {
        try {
            try {
                this.rightQueue.close();
            }
            finally {
                if (this.rightIter != null) {
                    try {
                        this.rightIter.close();
                        this.rightIter = null;
                    }
                    catch (Throwable ignore) {
                        if (ignore instanceof InterruptedException) {
                            Thread.currentThread().interrupt();
                        }
                        log.trace("Failed to send interrupt signal:", ignore);
                    }
                }
            }
        }
        finally {
            super.handleClose();
        }
    }

    @Override
    public boolean isFinished() {
        return this.finished;
    }

    @Override
    public QueryInfo getQueryInfo() {
        return this.queryInfo;
    }

    protected String getId() {
        return "#" + this.executorId + " (Query: " + this.queryInfo.getQueryID() + ")";
    }

    public String getDisplayId() {
        return this.getExecutorType() + " " + this.getId();
    }

    protected String getExecutorType() {
        return "Executor";
    }

    public String toString() {
        return this.getExecutorType() + " " + this.getClass().getSimpleName() + " {id: " + this.getId() + "}";
    }
}

