/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateMaintainer;
import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl;
import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

public class GlobalStreamThread
extends Thread {
    private final Logger log;
    private final LogContext logContext;
    private final StreamsConfig config;
    private final Consumer<byte[], byte[]> globalConsumer;
    private final StateDirectory stateDirectory;
    private final Time time;
    private final ThreadCache cache;
    private final StreamsMetricsImpl streamsMetrics;
    private final ProcessorTopology topology;
    private final AtomicLong cacheSize;
    private volatile StreamsException startupException;
    private java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler;
    private volatile long fetchDeadlineClientInstanceId = -1L;
    private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new KafkaFutureImpl();
    private final CountDownLatch initializationLatch = new CountDownLatch(1);
    private volatile State state = State.CREATED;
    private final Object stateLock = new Object();
    private StreamThread.StateListener stateListener = null;
    private final String logPrefix;
    private final StateRestoreListener stateRestoreListener;

    public void setStateListener(StreamThread.StateListener listener) {
        this.stateListener = listener;
    }

    public State state() {
        return this.state;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setState(State newState) {
        State oldState = this.state;
        Object object = this.stateLock;
        synchronized (object) {
            if (this.state == State.PENDING_SHUTDOWN && newState == State.PENDING_SHUTDOWN) {
                return;
            }
            if (this.state == State.DEAD) {
                return;
            }
            if (!this.state.isValidTransition(newState)) {
                this.log.error("Unexpected state transition from {} to {}", (Object)oldState, (Object)newState);
                throw new StreamsException(this.logPrefix + "Unexpected state transition from " + String.valueOf(oldState) + " to " + String.valueOf(newState));
            }
            this.log.info("State transition from {} to {}", (Object)oldState, (Object)newState);
            this.state = newState;
        }
        if (this.stateListener != null) {
            this.stateListener.onChange(this, this.state, oldState);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean stillRunning() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.isRunning();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean inErrorState() {
        Object object = this.stateLock;
        synchronized (object) {
            return this.state.inErrorState();
        }
    }

    public GlobalStreamThread(ProcessorTopology topology, StreamsConfig config, Consumer<byte[], byte[]> globalConsumer, StateDirectory stateDirectory, long cacheSizeBytes, StreamsMetricsImpl streamsMetrics, Time time, String threadClientId, StateRestoreListener stateRestoreListener, java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler) {
        super(threadClientId);
        this.time = time;
        this.config = config;
        this.topology = topology;
        this.globalConsumer = globalConsumer;
        this.stateDirectory = stateDirectory;
        this.streamsMetrics = streamsMetrics;
        this.logPrefix = String.format("global-stream-thread [%s] ", threadClientId);
        this.logContext = new LogContext(this.logPrefix);
        this.log = this.logContext.logger(this.getClass());
        this.cache = new ThreadCache(this.logContext, cacheSizeBytes, this.streamsMetrics);
        this.stateRestoreListener = stateRestoreListener;
        this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
        this.cacheSize = new AtomicLong(-1L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        StateConsumer stateConsumer = this.initialize();
        if (stateConsumer == null) {
            this.setState(State.PENDING_SHUTDOWN);
            this.setState(State.DEAD);
            this.log.error("Error happened during initialization of the global state store; this thread has shutdown.");
            this.streamsMetrics.removeAllThreadLevelSensors(this.getName());
            this.streamsMetrics.removeAllThreadLevelMetrics(this.getName());
            return;
        }
        boolean wipeStateStore = false;
        try {
            while (this.stillRunning()) {
                long size = this.cacheSize.getAndSet(-1L);
                if (size != -1L) {
                    this.cache.resize(size);
                }
                stateConsumer.pollAndUpdate();
                if (this.fetchDeadlineClientInstanceId == -1L) continue;
                if (this.fetchDeadlineClientInstanceId >= this.time.milliseconds()) {
                    try {
                        this.clientInstanceIdFuture.complete((Object)this.globalConsumer.clientInstanceId(Duration.ZERO));
                        this.fetchDeadlineClientInstanceId = -1L;
                        continue;
                    }
                    catch (IllegalStateException disabledError) {
                        this.clientInstanceIdFuture.complete(null);
                        this.fetchDeadlineClientInstanceId = -1L;
                        continue;
                    }
                    catch (TimeoutException disabledError) {
                        continue;
                    }
                    catch (Exception error) {
                        this.clientInstanceIdFuture.completeExceptionally((Throwable)error);
                        this.fetchDeadlineClientInstanceId = -1L;
                        continue;
                    }
                }
                this.clientInstanceIdFuture.completeExceptionally((Throwable)new TimeoutException("Could not retrieve global consumer client instance id."));
                this.fetchDeadlineClientInstanceId = -1L;
            }
            return;
        }
        catch (InvalidOffsetException recoverableException) {
            wipeStateStore = true;
            this.log.error("Updating global state failed due to inconsistent local state. Will attempt to clean up the local state. You can restart KafkaStreams to recover from this error.", (Throwable)recoverableException);
            StreamsException e = new StreamsException("Updating global state failed. You can restart KafkaStreams to launch a new GlobalStreamThread to recover from this error.", recoverableException);
            this.streamsUncaughtExceptionHandler.accept((Throwable)((Object)e));
            return;
        }
        catch (Exception e) {
            this.log.error("Error happened while maintaining global state store. The streams application or client will now close to ERROR.", (Throwable)e);
            this.streamsUncaughtExceptionHandler.accept(e);
            return;
        }
        finally {
            this.setState(State.PENDING_SHUTDOWN);
            this.log.info("Shutting down");
            try {
                stateConsumer.close(wipeStateStore);
            }
            catch (IOException e) {
                this.log.error("Failed to close state maintainer due to the following error:", (Throwable)e);
            }
            this.streamsMetrics.removeAllThreadLevelSensors(this.getName());
            this.streamsMetrics.removeAllThreadLevelMetrics(this.getName());
            this.setState(State.DEAD);
            this.log.info("Shutdown complete");
        }
    }

    public void setUncaughtExceptionHandler(java.util.function.Consumer<Throwable> streamsUncaughtExceptionHandler) {
        this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
    }

    public void resize(long cacheSize) {
        this.cacheSize.set(cacheSize);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private StateConsumer initialize() {
        StateConsumer stateConsumer = null;
        try {
            GlobalStateManagerImpl stateMgr = new GlobalStateManagerImpl(this.logContext, this.time, this.topology, this.globalConsumer, this.stateDirectory, this.stateRestoreListener, this.config);
            GlobalProcessorContextImpl globalProcessorContext = new GlobalProcessorContextImpl(this.config, stateMgr, this.streamsMetrics, this.cache, this.time);
            stateMgr.setGlobalProcessorContext(globalProcessorContext);
            StreamsThreadMetricsDelegatingReporter globalMetricsReporter = new StreamsThreadMetricsDelegatingReporter(this.globalConsumer, this.getName(), Optional.empty());
            this.streamsMetrics.metricsRegistry().addReporter((MetricsReporter)globalMetricsReporter);
            stateConsumer = new StateConsumer(this.logContext, this.globalConsumer, new GlobalStateUpdateTask(this.logContext, this.topology, globalProcessorContext, stateMgr, this.config.deserializationExceptionHandler(), this.time, this.config.getLong("commit.interval.ms")), Duration.ofMillis(this.config.getLong("poll.ms")));
            try {
                stateConsumer.initialize();
            }
            catch (InvalidOffsetException recoverableException) {
                this.log.error("Bootstrapping global state failed due to inconsistent local state. Will attempt to clean up the local state. You can restart KafkaStreams to recover from this error.", (Throwable)recoverableException);
                this.closeStateConsumer(stateConsumer, true);
                throw new StreamsException("Bootstrapping global state failed. You can restart KafkaStreams to recover from this error.", recoverableException);
            }
            this.setState(State.RUNNING);
            StateConsumer stateConsumer2 = stateConsumer;
            return stateConsumer2;
        }
        catch (StreamsException fatalException) {
            this.closeStateConsumer(stateConsumer, false);
            this.startupException = fatalException;
        }
        catch (Exception fatalException) {
            this.closeStateConsumer(stateConsumer, false);
            this.startupException = new StreamsException("Exception caught during initialization of GlobalStreamThread", fatalException);
        }
        finally {
            this.initializationLatch.countDown();
        }
        return null;
    }

    private void closeStateConsumer(StateConsumer stateConsumer, boolean wipeStateStore) {
        if (stateConsumer != null) {
            try {
                stateConsumer.close(wipeStateStore);
            }
            catch (IOException e) {
                this.log.error("Failed to close state consumer due to the following error:", (Throwable)e);
            }
        }
    }

    @Override
    public synchronized void start() {
        super.start();
        try {
            this.initializationLatch.await();
        }
        catch (InterruptedException e) {
            GlobalStreamThread.currentThread().interrupt();
            throw new IllegalStateException("GlobalStreamThread was interrupted during initialization", e);
        }
        if (this.startupException != null) {
            throw this.startupException;
        }
        if (this.inErrorState()) {
            throw new IllegalStateException("Initialization for the global stream thread failed");
        }
    }

    public void shutdown() {
        this.setState(State.PENDING_SHUTDOWN);
        this.initializationLatch.countDown();
    }

    public Map<MetricName, Metric> consumerMetrics() {
        return Collections.unmodifiableMap(this.globalConsumer.metrics());
    }

    public KafkaFuture<Uuid> globalConsumerInstanceId(Duration timeout) {
        boolean setDeadline = false;
        if (this.clientInstanceIdFuture.isDone()) {
            if (this.clientInstanceIdFuture.isCompletedExceptionally()) {
                this.clientInstanceIdFuture = new KafkaFutureImpl();
                setDeadline = true;
            }
        } else {
            setDeadline = true;
        }
        if (setDeadline) {
            this.fetchDeadlineClientInstanceId = this.time.milliseconds() + timeout.toMillis();
        }
        return this.clientInstanceIdFuture;
    }

    public static enum State implements ThreadStateTransitionValidator
    {
        CREATED(1, 2),
        RUNNING(2),
        PENDING_SHUTDOWN(3),
        DEAD(new Integer[0]);

        private final Set<Integer> validTransitions = new HashSet<Integer>();

        private State(Integer ... validTransitions) {
            this.validTransitions.addAll(Arrays.asList(validTransitions));
        }

        public boolean isRunning() {
            return this.equals(RUNNING);
        }

        public boolean inErrorState() {
            return this.equals(DEAD) || this.equals(PENDING_SHUTDOWN);
        }

        @Override
        public boolean isValidTransition(ThreadStateTransitionValidator newState) {
            State tmpState = (State)newState;
            return this.validTransitions.contains(tmpState.ordinal());
        }
    }

    static class StateConsumer {
        private final Consumer<byte[], byte[]> globalConsumer;
        private final GlobalStateMaintainer stateMaintainer;
        private final Duration pollTime;
        private final Logger log;

        StateConsumer(LogContext logContext, Consumer<byte[], byte[]> globalConsumer, GlobalStateMaintainer stateMaintainer, Duration pollTime) {
            this.log = logContext.logger(this.getClass());
            this.globalConsumer = globalConsumer;
            this.stateMaintainer = stateMaintainer;
            this.pollTime = pollTime;
        }

        void initialize() {
            Map<TopicPartition, Long> partitionOffsets = this.stateMaintainer.initialize();
            this.globalConsumer.assign(partitionOffsets.keySet());
            for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
                this.globalConsumer.seek(entry.getKey(), entry.getValue().longValue());
            }
        }

        void pollAndUpdate() {
            ConsumerRecords received = this.globalConsumer.poll(this.pollTime);
            for (ConsumerRecord record : received) {
                this.stateMaintainer.update((ConsumerRecord<byte[], byte[]>)record);
            }
            this.stateMaintainer.maybeCheckpoint();
        }

        public void close(boolean wipeStateStore) throws IOException {
            try {
                this.globalConsumer.close();
            }
            catch (RuntimeException e) {
                this.log.error("Failed to close global consumer due to the following error:", (Throwable)e);
            }
            this.stateMaintainer.close(wipeStateStore);
        }
    }
}

