/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.search.asynchronous.processor;

import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ActionListener;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContextId;
import org.opensearch.search.asynchronous.context.active.AsynchronousSearchActiveContext;
import org.opensearch.search.asynchronous.context.active.AsynchronousSearchActiveStore;
import org.opensearch.search.asynchronous.context.active.AsynchronousSearchContextClosedException;
import org.opensearch.search.asynchronous.context.persistence.AsynchronousSearchPersistenceModel;
import org.opensearch.search.asynchronous.context.state.AsynchronousSearchState;
import org.opensearch.search.asynchronous.context.state.AsynchronousSearchStateMachine;
import org.opensearch.search.asynchronous.context.state.AsynchronousSearchStateMachineClosedException;
import org.opensearch.search.asynchronous.context.state.event.BeginPersistEvent;
import org.opensearch.search.asynchronous.context.state.event.SearchFailureEvent;
import org.opensearch.search.asynchronous.context.state.event.SearchResponsePersistFailedEvent;
import org.opensearch.search.asynchronous.context.state.event.SearchResponsePersistedEvent;
import org.opensearch.search.asynchronous.context.state.event.SearchSuccessfulEvent;
import org.opensearch.search.asynchronous.response.AsynchronousSearchResponse;
import org.opensearch.search.asynchronous.service.AsynchronousSearchPersistenceService;
import org.opensearch.threadpool.ThreadPool;

public class AsynchronousSearchPostProcessor {
    private static final Logger logger = LogManager.getLogger(AsynchronousSearchPostProcessor.class);
    private final AsynchronousSearchPersistenceService asynchronousSearchPersistenceService;
    private final AsynchronousSearchActiveStore asynchronousSearchActiveStore;
    private final AsynchronousSearchStateMachine asynchronousSearchStateMachine;
    private final Consumer<AsynchronousSearchActiveContext> freeActiveContextConsumer;
    private final ThreadPool threadPool;

    public AsynchronousSearchPostProcessor(AsynchronousSearchPersistenceService asynchronousSearchPersistenceService, AsynchronousSearchActiveStore asynchronousSearchActiveStore, AsynchronousSearchStateMachine stateMachine, Consumer<AsynchronousSearchActiveContext> freeActiveContextConsumer, ThreadPool threadPool, ClusterService clusterService) {
        this.asynchronousSearchActiveStore = asynchronousSearchActiveStore;
        this.asynchronousSearchPersistenceService = asynchronousSearchPersistenceService;
        this.asynchronousSearchStateMachine = stateMachine;
        this.freeActiveContextConsumer = freeActiveContextConsumer;
        this.threadPool = threadPool;
    }

    public AsynchronousSearchResponse processSearchFailure(Exception exception, AsynchronousSearchContextId asynchronousSearchContextId) {
        Optional<AsynchronousSearchActiveContext> asynchronousSearchContextOptional = this.asynchronousSearchActiveStore.getContext(asynchronousSearchContextId);
        try {
            if (asynchronousSearchContextOptional.isPresent()) {
                AsynchronousSearchActiveContext asynchronousSearchContext = asynchronousSearchContextOptional.get();
                this.asynchronousSearchStateMachine.trigger(new SearchFailureEvent(asynchronousSearchContext, exception));
                this.handlePersist(asynchronousSearchContext);
                return asynchronousSearchContext.getAsynchronousSearchResponse();
            }
            return new AsynchronousSearchResponse(AsynchronousSearchState.FAILED, -1L, -1L, null, ExceptionsHelper.convertToOpenSearchException((Exception)exception));
        }
        catch (AsynchronousSearchStateMachineClosedException ex) {
            return new AsynchronousSearchResponse(AsynchronousSearchState.FAILED, -1L, -1L, null, ExceptionsHelper.convertToOpenSearchException((Exception)exception));
        }
    }

    public AsynchronousSearchResponse processSearchResponse(SearchResponse searchResponse, AsynchronousSearchContextId asynchronousSearchContextId) {
        Optional<AsynchronousSearchActiveContext> asynchronousSearchContextOptional = this.asynchronousSearchActiveStore.getContext(asynchronousSearchContextId);
        try {
            if (asynchronousSearchContextOptional.isPresent()) {
                AsynchronousSearchActiveContext asynchronousSearchContext = asynchronousSearchContextOptional.get();
                this.asynchronousSearchStateMachine.trigger(new SearchSuccessfulEvent(asynchronousSearchContext, searchResponse));
                this.handlePersist(asynchronousSearchContext);
                return asynchronousSearchContext.getAsynchronousSearchResponse();
            }
            return new AsynchronousSearchResponse(AsynchronousSearchState.SUCCEEDED, -1L, -1L, searchResponse, null);
        }
        catch (AsynchronousSearchStateMachineClosedException ex) {
            return new AsynchronousSearchResponse(AsynchronousSearchState.SUCCEEDED, -1L, -1L, searchResponse, null);
        }
    }

    public void persistResponse(AsynchronousSearchActiveContext asynchronousSearchContext, AsynchronousSearchPersistenceModel persistenceModel) {
        asynchronousSearchContext.acquireAllContextPermits((ActionListener<Releasable>)ActionListener.wrap(releasable -> {
            if (!asynchronousSearchContext.shouldPersist()) {
                logger.debug("Async search context [{}] has been closed while waiting to acquire permits for post processing", (Object)asynchronousSearchContext.getAsynchronousSearchId());
                releasable.close();
                return;
            }
            logger.debug("Persisting response for asynchronous search id [{}]", (Object)asynchronousSearchContext.getAsynchronousSearchId());
            try (ThreadContext.StoredContext ignore = this.threadPool.getThreadContext().stashContext();){
                this.asynchronousSearchPersistenceService.storeResponse(asynchronousSearchContext.getAsynchronousSearchId(), persistenceModel, (ActionListener<IndexResponse>)ActionListener.runAfter((ActionListener)ActionListener.wrap(indexResponse -> {
                    logger.debug("Successfully persisted response for asynchronous search id [{}]", (Object)asynchronousSearchContext.getAsynchronousSearchId());
                    try {
                        this.asynchronousSearchStateMachine.trigger(new SearchResponsePersistedEvent(asynchronousSearchContext));
                    }
                    catch (AsynchronousSearchStateMachineClosedException ex) {
                        logger.warn("Unexpected state, possibly caused by external task cancellation, context with id [{}] closed while triggering event [{}]", (Object)asynchronousSearchContext.getAsynchronousSearchId(), (Object)SearchResponsePersistedEvent.class.getName());
                    }
                    finally {
                        this.freeActiveContextConsumer.accept(asynchronousSearchContext);
                    }
                }, e -> {
                    try {
                        this.asynchronousSearchStateMachine.trigger(new SearchResponsePersistFailedEvent(asynchronousSearchContext));
                    }
                    catch (AsynchronousSearchStateMachineClosedException ex) {
                        logger.warn("Unexpected state, possibly caused by external task cancellation, context with id [{}] closed while triggering event [{}]", (Object)asynchronousSearchContext.getAsynchronousSearchId(), (Object)SearchResponsePersistFailedEvent.class.getName());
                    }
                    finally {
                        this.freeActiveContextConsumer.accept(asynchronousSearchContext);
                    }
                    logger.error(() -> new ParameterizedMessage("Failed to persist final response for [{}] due to [{}]", (Object)asynchronousSearchContext.getAsynchronousSearchId(), e));
                }), () -> ((Releasable)releasable).close()));
            }
        }, e -> {
            Throwable cause = ExceptionsHelper.unwrapCause((Throwable)e);
            Level level = cause instanceof AsynchronousSearchContextClosedException || cause instanceof TimeoutException ? Level.DEBUG : Level.WARN;
            logger.log(level, () -> new ParameterizedMessage("Exception  occured while acquiring the permit for asynchronousSearchContext [{}]", (Object)asynchronousSearchContext.getAsynchronousSearchId()), (Throwable)e);
            this.freeActiveContextConsumer.accept(asynchronousSearchContext);
        }), TimeValue.timeValueSeconds((long)120L), "persisting response");
    }

    private void handlePersist(AsynchronousSearchActiveContext asynchronousSearchContext) {
        if (asynchronousSearchContext.shouldPersist()) {
            try {
                this.asynchronousSearchStateMachine.trigger(new BeginPersistEvent(asynchronousSearchContext));
            }
            catch (AsynchronousSearchStateMachineClosedException e) {
                this.freeActiveContextConsumer.accept(asynchronousSearchContext);
            }
        } else {
            this.freeActiveContextConsumer.accept(asynchronousSearchContext);
        }
    }
}

