/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.ad.ratelimit;

import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.ad.NodeStateManager;
import org.opensearch.ad.breaker.ADCircuitBreakerService;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.ratelimit.BatchWorker;
import org.opensearch.ad.ratelimit.RequestPriority;
import org.opensearch.ad.ratelimit.ResultWriteRequest;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.transport.ADResultBulkRequest;
import org.opensearch.ad.transport.ADResultBulkResponse;
import org.opensearch.ad.transport.handler.MultiEntityResultHandler;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.threadpool.ThreadPool;

public class ResultWriteWorker
extends BatchWorker<ResultWriteRequest, ADResultBulkRequest, ADResultBulkResponse> {
    private static final Logger LOG = LogManager.getLogger(ResultWriteWorker.class);
    public static final String WORKER_NAME = "result-write";
    private final MultiEntityResultHandler resultHandler;
    private NamedXContentRegistry xContentRegistry;

    public ResultWriteWorker(long heapSizeInBytes, int singleRequestSizeInBytes, Setting<Float> maxHeapPercentForQueueSetting, ClusterService clusterService, Random random, ADCircuitBreakerService adCircuitBreakerService, ThreadPool threadPool, Settings settings, float maxQueuedTaskRatio, Clock clock, float mediumSegmentPruneRatio, float lowSegmentPruneRatio, int maintenanceFreqConstant, Duration executionTtl, MultiEntityResultHandler resultHandler, NamedXContentRegistry xContentRegistry, NodeStateManager stateManager, Duration stateTtl) {
        super(WORKER_NAME, heapSizeInBytes, singleRequestSizeInBytes, maxHeapPercentForQueueSetting, clusterService, random, adCircuitBreakerService, threadPool, settings, maxQueuedTaskRatio, clock, mediumSegmentPruneRatio, lowSegmentPruneRatio, maintenanceFreqConstant, AnomalyDetectorSettings.RESULT_WRITE_QUEUE_CONCURRENCY, executionTtl, AnomalyDetectorSettings.RESULT_WRITE_QUEUE_BATCH_SIZE, stateTtl, stateManager);
        this.resultHandler = resultHandler;
        this.xContentRegistry = xContentRegistry;
    }

    @Override
    protected void executeBatchRequest(ADResultBulkRequest request, ActionListener<ADResultBulkResponse> listener) {
        if (request.numberOfActions() < 1) {
            listener.onResponse(null);
            return;
        }
        this.resultHandler.flush(request, listener);
    }

    @Override
    protected ADResultBulkRequest toBatchRequest(List<ResultWriteRequest> toProcess) {
        ADResultBulkRequest bulkRequest = new ADResultBulkRequest();
        for (ResultWriteRequest request : toProcess) {
            bulkRequest.add(request);
        }
        return bulkRequest;
    }

    @Override
    protected ActionListener<ADResultBulkResponse> getResponseListener(List<ResultWriteRequest> toProcess, ADResultBulkRequest bulkRequest) {
        return ActionListener.wrap(adResultBulkResponse -> {
            if (adResultBulkResponse == null || !adResultBulkResponse.getRetryRequests().isPresent()) {
                return;
            }
            this.enqueueRetryRequestIteration(adResultBulkResponse.getRetryRequests().get(), 0);
        }, exception -> {
            if (ExceptionUtil.isRetryAble(exception)) {
                super.putAll(toProcess);
            } else if (ExceptionUtil.isOverloaded(exception)) {
                LOG.error("too many get AD model checkpoint requests or shard not avialble");
                this.setCoolDownStart();
            }
            for (ResultWriteRequest request : toProcess) {
                this.nodeStateManager.setException(request.getDetectorId(), (Exception)exception);
            }
            LOG.error("Fail to save results", (Throwable)exception);
        });
    }

    private void enqueueRetryRequestIteration(List<IndexRequest> requestToRetry, int index) {
        if (index >= requestToRetry.size()) {
            return;
        }
        DocWriteRequest currentRequest = (DocWriteRequest)requestToRetry.get(index);
        Optional<AnomalyResult> resultToRetry = this.getAnomalyResult(currentRequest);
        if (!resultToRetry.isPresent()) {
            this.enqueueRetryRequestIteration(requestToRetry, index + 1);
            return;
        }
        AnomalyResult result = resultToRetry.get();
        String detectorId = result.getDetectorId();
        this.nodeStateManager.getAnomalyDetector(detectorId, this.onGetDetector(requestToRetry, index, detectorId, result));
    }

    private ActionListener<Optional<AnomalyDetector>> onGetDetector(List<IndexRequest> requestToRetry, int index, String detectorId, AnomalyResult resultToRetry) {
        return ActionListener.wrap(detectorOptional -> {
            if (!detectorOptional.isPresent()) {
                LOG.warn((Message)new ParameterizedMessage("AnomalyDetector [{}] is not available.", (Object)detectorId));
                this.enqueueRetryRequestIteration(requestToRetry, index + 1);
                return;
            }
            AnomalyDetector detector = (AnomalyDetector)detectorOptional.get();
            super.put(new ResultWriteRequest(resultToRetry.getExecutionStartTime().toEpochMilli() + detector.getDetectorIntervalInMilliseconds(), detectorId, resultToRetry.isHighPriority() ? RequestPriority.HIGH : RequestPriority.MEDIUM, resultToRetry, detector.getResultIndex()));
            this.enqueueRetryRequestIteration(requestToRetry, index + 1);
        }, exception -> {
            LOG.error((Message)new ParameterizedMessage("fail to get detector [{}]", (Object)detectorId), (Throwable)exception);
            this.enqueueRetryRequestIteration(requestToRetry, index + 1);
        });
    }

    private Optional<AnomalyResult> getAnomalyResult(DocWriteRequest<?> request) {
        Optional<AnomalyResult> optional;
        block9: {
            if (!(request instanceof IndexRequest)) {
                LOG.error((Message)new ParameterizedMessage("We should only send IndexRquest, but get [{}].", request));
                return Optional.empty();
            }
            IndexRequest indexRequest = (IndexRequest)request;
            BytesReference indexSource = indexRequest.source();
            MediaType indexContentType = indexRequest.getContentType();
            XContentParser xContentParser = XContentHelper.createParser((NamedXContentRegistry)this.xContentRegistry, (DeprecationHandler)LoggingDeprecationHandler.INSTANCE, (BytesReference)indexSource, (MediaType)indexContentType);
            try {
                xContentParser.nextToken();
                optional = Optional.of(AnomalyResult.parse(xContentParser));
                if (xContentParser == null) break block9;
            }
            catch (Throwable throwable) {
                try {
                    if (xContentParser != null) {
                        try {
                            xContentParser.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (Exception e) {
                    LOG.error((Message)new ParameterizedMessage("Fail to parse index request [{}]", request), (Throwable)e);
                    return Optional.empty();
                }
            }
            xContentParser.close();
        }
        return optional;
    }
}

