/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group;

import com.google.re2j.Pattern;
import com.google.re2j.PatternSyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.FencedMemberEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
import org.apache.kafka.common.errors.InvalidRegularExpression;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.message.AlterShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.AlterShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.ConsumerProtocolAssignment;
import org.apache.kafka.common.message.ConsumerProtocolSubscription;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
import org.apache.kafka.common.message.DeleteShareGroupOffsetsResponseData;
import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.SchemaException;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.common.runtime.CoordinatorExecutor;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.Utils;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.assignor.SimpleAssignor;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyKey;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.ModernGroup;
import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupAssignmentBuilder;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
import org.apache.kafka.coordinator.group.streams.CurrentAssignmentBuilder;
import org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
import org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder;
import org.apache.kafka.coordinator.group.streams.TasksTuple;
import org.apache.kafka.coordinator.group.streams.assignor.StickyTaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredSubtopology;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.coordinator.group.streams.topics.EndpointToPartitionsManager;
import org.apache.kafka.coordinator.group.streams.topics.InternalTopicManager;
import org.apache.kafka.coordinator.group.streams.topics.TopicConfigurationException;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.share.persister.DeleteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.InitializeShareGroupStateParameters;
import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.TopicData;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.slf4j.Logger;

public class GroupMetadataManager {
    private static final int METADATA_REFRESH_INTERVAL_MS = Integer.MAX_VALUE;
    static final long REGEX_BATCH_REFRESH_MIN_INTERVAL_MS = 10000L;
    private final LogContext logContext;
    private final Logger log;
    private final SnapshotRegistry snapshotRegistry;
    private final Time time;
    private final CoordinatorTimer<Void, CoordinatorRecord> timer;
    private final CoordinatorExecutor<CoordinatorRecord> executor;
    private final GroupCoordinatorMetricsShard metrics;
    private final GroupCoordinatorConfig config;
    private final Map<String, ConsumerGroupPartitionAssignor> consumerGroupAssignors;
    private final ConsumerGroupPartitionAssignor defaultConsumerGroupAssignor;
    private final TimelineHashMap<String, Group> groups;
    private final TimelineHashMap<String, TimelineHashSet<String>> groupsByTopics;
    private final TimelineHashMap<String, ShareGroup.ShareGroupStatePartitionMetadataInfo> shareGroupStatePartitionMetadata;
    private final GroupConfigManager groupConfigManager;
    private final Map<String, TaskAssignor> streamsGroupAssignors;
    private MetadataImage metadataImage;
    private final Map<String, Long> topicHashCache;
    private long lastMetadataImageWithNewTopics = -1L;
    static final CoordinatorResult<Void, CoordinatorRecord> EMPTY_RESULT = new CoordinatorResult(List.of(), CompletableFuture.completedFuture(null), false);
    private final ShareGroupPartitionAssignor shareGroupAssignor;
    private final Optional<Plugin<Authorizer>> authorizerPlugin;

    private GroupMetadataManager(SnapshotRegistry snapshotRegistry, LogContext logContext, Time time, CoordinatorTimer<Void, CoordinatorRecord> timer, CoordinatorExecutor<CoordinatorRecord> executor, GroupCoordinatorMetricsShard metrics, MetadataImage metadataImage, GroupCoordinatorConfig config, GroupConfigManager groupConfigManager, ShareGroupPartitionAssignor shareGroupAssignor, Optional<Plugin<Authorizer>> authorizerPlugin, List<TaskAssignor> streamsGroupAssignors) {
        this.logContext = logContext;
        this.log = logContext.logger(GroupMetadataManager.class);
        this.snapshotRegistry = snapshotRegistry;
        this.time = time;
        this.timer = timer;
        this.executor = executor;
        this.metrics = metrics;
        this.config = config;
        this.metadataImage = metadataImage;
        this.consumerGroupAssignors = config.consumerGroupAssignors().stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity()));
        this.defaultConsumerGroupAssignor = config.consumerGroupAssignors().get(0);
        this.groups = new TimelineHashMap(snapshotRegistry, 0);
        this.groupsByTopics = new TimelineHashMap(snapshotRegistry, 0);
        this.shareGroupStatePartitionMetadata = new TimelineHashMap(snapshotRegistry, 0);
        this.groupConfigManager = groupConfigManager;
        this.shareGroupAssignor = shareGroupAssignor;
        this.authorizerPlugin = authorizerPlugin;
        this.streamsGroupAssignors = streamsGroupAssignors.stream().collect(Collectors.toMap(TaskAssignor::name, Function.identity()));
        this.topicHashCache = new HashMap<String, Long>();
    }

    public MetadataImage image() {
        return this.metadataImage;
    }

    public Group group(String groupId) throws GroupIdNotFoundException {
        Group group = (Group)this.groups.get((Object)groupId, Long.MAX_VALUE);
        if (group == null) {
            throw new GroupIdNotFoundException(String.format("Group %s not found.", groupId));
        }
        return group;
    }

    public Group group(String groupId, long committedOffset) throws GroupIdNotFoundException {
        Group group = (Group)this.groups.get((Object)groupId, committedOffset);
        if (group == null) {
            throw new GroupIdNotFoundException(String.format("Group %s not found.", groupId));
        }
        return group;
    }

    public List<ListGroupsResponseData.ListedGroup> listGroups(Set<String> statesFilter, Set<String> typesFilter, long committedOffset) {
        Set caseInsensitiveFilterSet = statesFilter.stream().map(String::toLowerCase).map(String::trim).collect(Collectors.toSet());
        Set enumTypesFilter = typesFilter.stream().map(Group.GroupType::parse).collect(Collectors.toSet());
        Predicate<Group> combinedFilter = group -> {
            boolean stateCheck = statesFilter.isEmpty() || group.isInStates(caseInsensitiveFilterSet, committedOffset);
            boolean typeCheck = enumTypesFilter.isEmpty() || enumTypesFilter.contains((Object)group.type());
            return stateCheck && typeCheck;
        };
        Stream groupStream = this.groups.values(committedOffset).stream();
        return groupStream.filter(combinedFilter).map(group -> group.asListedGroup(committedOffset)).toList();
    }

    public List<ConsumerGroupDescribeResponseData.DescribedGroup> consumerGroupDescribe(List<String> groupIds, long committedOffset) {
        ArrayList<ConsumerGroupDescribeResponseData.DescribedGroup> describedGroups = new ArrayList<ConsumerGroupDescribeResponseData.DescribedGroup>();
        groupIds.forEach(groupId -> {
            try {
                describedGroups.add(this.consumerGroup((String)groupId, committedOffset).asDescribedGroup(committedOffset, this.defaultConsumerGroupAssignor.name(), this.metadataImage.topics()));
            }
            catch (GroupIdNotFoundException exception) {
                describedGroups.add(new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId(groupId).setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()).setErrorMessage(exception.getMessage()));
            }
        });
        return describedGroups;
    }

    public List<ShareGroupDescribeResponseData.DescribedGroup> shareGroupDescribe(List<String> groupIds, long committedOffset) {
        ArrayList<ShareGroupDescribeResponseData.DescribedGroup> describedGroups = new ArrayList<ShareGroupDescribeResponseData.DescribedGroup>();
        groupIds.forEach(groupId -> {
            try {
                describedGroups.add(this.shareGroup((String)groupId, committedOffset).asDescribedGroup(committedOffset, this.shareGroupAssignor.name(), this.metadataImage.topics()));
            }
            catch (GroupIdNotFoundException exception) {
                describedGroups.add(new ShareGroupDescribeResponseData.DescribedGroup().setGroupId(groupId).setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()).setErrorMessage(exception.getMessage()));
            }
        });
        return describedGroups;
    }

    public List<StreamsGroupDescribeResponseData.DescribedGroup> streamsGroupDescribe(List<String> groupIds, long committedOffset) {
        ArrayList<StreamsGroupDescribeResponseData.DescribedGroup> describedGroups = new ArrayList<StreamsGroupDescribeResponseData.DescribedGroup>();
        groupIds.forEach(groupId -> {
            try {
                describedGroups.add(this.streamsGroup((String)groupId, committedOffset).asDescribedGroup(committedOffset));
            }
            catch (GroupIdNotFoundException exception) {
                describedGroups.add(new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(groupId).setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()).setErrorMessage(exception.getMessage()));
            }
        });
        return describedGroups;
    }

    public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(AuthorizableRequestContext context, List<String> groupIds, long committedOffset) {
        ArrayList<DescribeGroupsResponseData.DescribedGroup> describedGroups = new ArrayList<DescribeGroupsResponseData.DescribedGroup>();
        groupIds.forEach(groupId -> {
            try {
                ClassicGroup group = this.classicGroup((String)groupId, committedOffset);
                if (group.isInState(ClassicGroupState.STABLE)) {
                    if (group.protocolName().isEmpty()) {
                        throw new IllegalStateException("Invalid null group protocol for stable group");
                    }
                    describedGroups.add(new DescribeGroupsResponseData.DescribedGroup().setGroupId(groupId).setGroupState(group.stateAsString()).setProtocolType(group.protocolType().orElse("")).setProtocolData(group.protocolName().get()).setMembers(group.allMembers().stream().map(member -> member.describe(group.protocolName().get())).toList()));
                } else {
                    describedGroups.add(new DescribeGroupsResponseData.DescribedGroup().setGroupId(groupId).setGroupState(group.stateAsString()).setProtocolType(group.protocolType().orElse("")).setMembers(group.allMembers().stream().map(ClassicGroupMember::describeNoMetadata).toList()));
                }
            }
            catch (GroupIdNotFoundException exception) {
                if (context.requestVersion() >= 6) {
                    describedGroups.add(new DescribeGroupsResponseData.DescribedGroup().setGroupId(groupId).setGroupState(ClassicGroupState.DEAD.toString()).setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()).setErrorMessage(exception.getMessage()));
                }
                describedGroups.add(new DescribeGroupsResponseData.DescribedGroup().setGroupId(groupId).setGroupState(ClassicGroupState.DEAD.toString()));
            }
        });
        return describedGroups;
    }

    ConsumerGroup getOrMaybeCreateConsumerGroup(String groupId, boolean createIfNotExists, List<CoordinatorRecord> records) throws GroupIdNotFoundException {
        Group group = (Group)this.groups.get((Object)groupId);
        if (group == null && !createIfNotExists) {
            throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId));
        }
        if (group == null) {
            return new ConsumerGroup(this.snapshotRegistry, groupId, this.metrics);
        }
        if (createIfNotExists && this.maybeDeleteEmptyClassicGroup(group, records)) {
            this.log.info("[GroupId {}] Converted the empty classic group to a consumer group.", (Object)groupId);
            return new ConsumerGroup(this.snapshotRegistry, groupId, this.metrics);
        }
        if (group.type() == Group.GroupType.CONSUMER) {
            return (ConsumerGroup)group;
        }
        if (createIfNotExists && group.type() == Group.GroupType.CLASSIC) {
            this.validateOnlineUpgrade((ClassicGroup)group);
            return this.convertToConsumerGroup((ClassicGroup)group, records);
        }
        throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId));
    }

    StreamsGroup getOrCreateStreamsGroup(String groupId) {
        Group group = (Group)this.groups.get((Object)groupId);
        if (group == null) {
            return new StreamsGroup(this.logContext, this.snapshotRegistry, groupId, this.metrics);
        }
        return this.castToStreamsGroup(group);
    }

    StreamsGroup getStreamsGroupOrThrow(String groupId) throws GroupIdNotFoundException {
        Group group = (Group)this.groups.get((Object)groupId);
        if (group == null) {
            throw new GroupIdNotFoundException(String.format("Streams group %s not found.", groupId));
        }
        return this.castToStreamsGroup(group);
    }

    private StreamsGroup castToStreamsGroup(Group group) {
        if (group.type() == Group.GroupType.STREAMS) {
            return (StreamsGroup)group;
        }
        throw new GroupIdNotFoundException(String.format("Group %s is not a streams group.", group.groupId()));
    }

    private StreamsGroup streamsGroup(String groupId, long committedOffset) throws GroupIdNotFoundException {
        Group group = this.group(groupId, committedOffset);
        if (group.type() == Group.GroupType.STREAMS) {
            return (StreamsGroup)group;
        }
        throw new GroupIdNotFoundException(String.format("Group %s is not a streams group.", groupId));
    }

    StreamsGroup streamsGroup(String groupId) throws GroupIdNotFoundException {
        return this.streamsGroup(groupId, Long.MAX_VALUE);
    }

    public ConsumerGroup consumerGroup(String groupId, long committedOffset) throws GroupIdNotFoundException {
        Group group = this.group(groupId, committedOffset);
        if (group.type() == Group.GroupType.CONSUMER) {
            return (ConsumerGroup)group;
        }
        throw new GroupIdNotFoundException(String.format("Group %s is not a consumer group.", groupId));
    }

    ConsumerGroup consumerGroup(String groupId) throws GroupIdNotFoundException {
        return this.consumerGroup(groupId, Long.MAX_VALUE);
    }

    ConsumerGroup getOrMaybeCreatePersistedConsumerGroup(String groupId, boolean createIfNotExists) throws GroupIdNotFoundException, IllegalStateException {
        Group group = (Group)this.groups.get((Object)groupId);
        if (group == null && !createIfNotExists) {
            throw new GroupIdNotFoundException(String.format("Consumer group %s not found.", groupId));
        }
        if (group == null) {
            ConsumerGroup consumerGroup = new ConsumerGroup(this.snapshotRegistry, groupId, this.metrics);
            this.groups.put((Object)groupId, (Object)consumerGroup);
            return consumerGroup;
        }
        if (group.type() == Group.GroupType.CONSUMER) {
            return (ConsumerGroup)group;
        }
        if (group.type() == Group.GroupType.CLASSIC && ((ClassicGroup)group).isSimpleGroup()) {
            ConsumerGroup consumerGroup = new ConsumerGroup(this.snapshotRegistry, groupId, this.metrics);
            this.groups.put((Object)groupId, (Object)consumerGroup);
            return consumerGroup;
        }
        throw new IllegalStateException(String.format("Group %s is not a consumer group", groupId));
    }

    private StreamsGroup getOrMaybeCreatePersistedStreamsGroup(String groupId, boolean createIfNotExists) throws GroupIdNotFoundException, IllegalStateException {
        Group group = (Group)this.groups.get((Object)groupId);
        if (group == null && !createIfNotExists) {
            throw new GroupIdNotFoundException(String.format("Streams group %s not found.", groupId));
        }
        if (group == null) {
            StreamsGroup streamsGroup = new StreamsGroup(this.logContext, this.snapshotRegistry, groupId, this.metrics);
            this.groups.put((Object)groupId, (Object)streamsGroup);
            return streamsGroup;
        }
        if (group.type() == Group.GroupType.STREAMS) {
            return (StreamsGroup)group;
        }
        throw new IllegalStateException(String.format("Group %s is not a streams group.", groupId));
    }

    ClassicGroup getOrMaybeCreateClassicGroup(String groupId, boolean createIfNotExists) throws GroupIdNotFoundException {
        Group group = (Group)this.groups.get((Object)groupId);
        if (group == null && !createIfNotExists) {
            throw new GroupIdNotFoundException(String.format("Classic group %s not found.", groupId));
        }
        if (group == null) {
            ClassicGroup classicGroup = new ClassicGroup(this.logContext, groupId, ClassicGroupState.EMPTY, this.time);
            this.groups.put((Object)groupId, (Object)classicGroup);
            return classicGroup;
        }
        if (group.type() == Group.GroupType.CLASSIC) {
            return (ClassicGroup)group;
        }
        throw new GroupIdNotFoundException(String.format("Group %s is not a classic group.", groupId));
    }

    public ClassicGroup classicGroup(String groupId, long committedOffset) throws GroupIdNotFoundException {
        Group group = this.group(groupId, committedOffset);
        if (group.type() == Group.GroupType.CLASSIC) {
            return (ClassicGroup)group;
        }
        throw new GroupIdNotFoundException(String.format("Group %s is not a classic group.", groupId));
    }

    private ShareGroup getOrMaybeCreateShareGroup(String groupId, boolean createIfNotExists) throws GroupIdNotFoundException {
        Group group = (Group)this.groups.get((Object)groupId);
        if (group == null && !createIfNotExists) {
            throw new GroupIdNotFoundException(String.format("Share group %s not found.", groupId));
        }
        if (group == null) {
            return new ShareGroup(this.snapshotRegistry, groupId);
        }
        if (group.type() == Group.GroupType.SHARE) {
            return (ShareGroup)group;
        }
        throw new GroupIdNotFoundException(String.format("Group %s is not a share group.", groupId));
    }

    ShareGroup getOrMaybeCreatePersistedShareGroup(String groupId, boolean createIfNotExists) throws GroupIdNotFoundException {
        Group group = (Group)this.groups.get((Object)groupId);
        if (group == null && !createIfNotExists) {
            throw new GroupIdNotFoundException(String.format("Share group %s not found.", groupId));
        }
        if (group == null) {
            ShareGroup shareGroup = new ShareGroup(this.snapshotRegistry, groupId);
            this.groups.put((Object)groupId, (Object)shareGroup);
            return shareGroup;
        }
        if (group.type() == Group.GroupType.SHARE) {
            return (ShareGroup)group;
        }
        throw new IllegalStateException(String.format("Group %s is not a share group.", groupId));
    }

    public ShareGroup shareGroup(String groupId, long committedOffset) throws GroupIdNotFoundException {
        Group group = this.group(groupId, committedOffset);
        if (group.type() == Group.GroupType.SHARE) {
            return (ShareGroup)group;
        }
        throw new GroupIdNotFoundException(String.format("Group %s is not a share group.", groupId));
    }

    ShareGroup shareGroup(String groupId) throws GroupIdNotFoundException {
        return this.shareGroup(groupId, Long.MAX_VALUE);
    }

    private boolean validateOnlineDowngradeWithFencedMembers(ConsumerGroup consumerGroup, Set<ConsumerGroupMember> fencedMembers) {
        if (!consumerGroup.allMembersUseClassicProtocolExcept(fencedMembers)) {
            return false;
        }
        if (consumerGroup.numMembers() - fencedMembers.size() <= 0) {
            this.log.debug("Skip downgrading the consumer group {} to classic group because it's empty.", (Object)consumerGroup.groupId());
            return false;
        }
        if (!this.config.consumerGroupMigrationPolicy().isDowngradeEnabled()) {
            this.log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", (Object)consumerGroup.groupId());
            return false;
        }
        if (consumerGroup.numMembers() - fencedMembers.size() > this.config.classicGroupMaxSize()) {
            this.log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", (Object)consumerGroup.groupId());
            return false;
        }
        return true;
    }

    private boolean validateOnlineDowngradeWithReplacedMember(ConsumerGroup consumerGroup, ConsumerGroupMember replacedMember) {
        if (!consumerGroup.allMembersUseClassicProtocolExcept(replacedMember)) {
            return false;
        }
        if (!this.config.consumerGroupMigrationPolicy().isDowngradeEnabled()) {
            this.log.info("Cannot downgrade consumer group {} to classic group because the online downgrade is disabled.", (Object)consumerGroup.groupId());
            return false;
        }
        if (consumerGroup.numMembers() > this.config.classicGroupMaxSize()) {
            this.log.info("Cannot downgrade consumer group {} to classic group because its group size is greater than classic group max size.", (Object)consumerGroup.groupId());
            return false;
        }
        return true;
    }

    private void convertToClassicGroup(ConsumerGroup consumerGroup, Set<ConsumerGroupMember> leavingMembers, ConsumerGroupMember joiningMember, List<CoordinatorRecord> records) {
        ClassicGroup classicGroup;
        if (joiningMember == null) {
            consumerGroup.createGroupTombstoneRecords(records);
        } else {
            ConsumerGroupMember replacedMember = consumerGroup.staticMember(joiningMember.instanceId());
            if (replacedMember == null) {
                throw new IllegalArgumentException("joiningMember must be a static member when not null.");
            }
            consumerGroup.createGroupTombstoneRecordsWithReplacedMember(records, replacedMember.memberId(), joiningMember.memberId());
        }
        try {
            classicGroup = ClassicGroup.fromConsumerGroup(consumerGroup, leavingMembers, joiningMember, this.logContext, this.time, this.metadataImage);
        }
        catch (SchemaException e) {
            this.log.warn("Cannot downgrade the consumer group " + consumerGroup.groupId() + ": fail to parse the Consumer Protocol consumer.", (Throwable)e);
            throw new GroupIdNotFoundException(String.format("Cannot downgrade the classic group %s: %s.", consumerGroup.groupId(), e.getMessage()));
        }
        classicGroup.createClassicGroupRecords(records);
        this.removeGroup(consumerGroup.groupId());
        this.groups.put((Object)consumerGroup.groupId(), (Object)classicGroup);
        classicGroup.allMembers().forEach(member -> this.rescheduleClassicGroupMemberHeartbeat(classicGroup, (ClassicGroupMember)member));
        if (joiningMember == null) {
            this.prepareRebalance(classicGroup, String.format("Downgrade group %s from consumer to classic.", classicGroup.groupId()));
        }
        this.log.info("[GroupId {}] Converted the consumer group to a classic group.", (Object)consumerGroup.groupId());
    }

    private void validateOnlineUpgrade(ClassicGroup classicGroup) {
        if (!this.config.consumerGroupMigrationPolicy().isUpgradeEnabled()) {
            this.log.info("Cannot upgrade classic group {} to consumer group because online upgrade is disabled.", (Object)classicGroup.groupId());
            throw new GroupIdNotFoundException(String.format("Cannot upgrade classic group %s to consumer group because online upgrade is disabled.", classicGroup.groupId()));
        }
        if (!classicGroup.usesConsumerGroupProtocol()) {
            this.log.info("Cannot upgrade classic group {} to consumer group because the group does not use the consumer embedded protocol.", (Object)classicGroup.groupId());
            throw new GroupIdNotFoundException(String.format("Cannot upgrade classic group %s to consumer group because the group does not use the consumer embedded protocol.", classicGroup.groupId()));
        }
        if (classicGroup.numMembers() > this.config.consumerGroupMaxSize()) {
            this.log.info("Cannot upgrade classic group {} to consumer group because the group size exceeds the consumer group maximum size.", (Object)classicGroup.groupId());
            throw new GroupIdNotFoundException(String.format("Cannot upgrade classic group %s to consumer group because the group size exceeds the consumer group maximum size.", classicGroup.groupId()));
        }
    }

    ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<CoordinatorRecord> records) {
        ConsumerGroup consumerGroup;
        classicGroup.completeAllJoinFutures(Errors.REBALANCE_IN_PROGRESS);
        classicGroup.completeAllSyncFutures(Errors.REBALANCE_IN_PROGRESS);
        classicGroup.createGroupTombstoneRecords(records);
        try {
            consumerGroup = ConsumerGroup.fromClassicGroup(this.snapshotRegistry, this.metrics, classicGroup, this.topicHashCache, this.metadataImage);
        }
        catch (SchemaException e) {
            this.log.warn("Cannot upgrade classic group " + classicGroup.groupId() + " to consumer group because the embedded consumer protocol is malformed: " + e.getMessage() + ".", (Throwable)e);
            throw new GroupIdNotFoundException(String.format("Cannot upgrade classic group %s to consumer group because the embedded consumer protocol is malformed.", classicGroup.groupId()));
        }
        catch (UnsupportedVersionException e) {
            this.log.warn("Cannot upgrade classic group " + classicGroup.groupId() + " to consumer group: " + e.getMessage() + ".", (Throwable)e);
            throw new GroupIdNotFoundException(String.format("Cannot upgrade classic group %s to consumer group because an unsupported custom assignor is in use. Please refer to the documentation or switch to a default assignor before re-attempting the upgrade.", classicGroup.groupId()));
        }
        consumerGroup.createConsumerGroupRecords(records);
        consumerGroup.members().forEach((memberId, member) -> this.scheduleConsumerGroupSessionTimeout(consumerGroup.groupId(), (String)memberId, member.classicProtocolSessionTimeout().get()));
        this.log.info("[GroupId {}] Converted the classic group to a consumer group.", (Object)classicGroup.groupId());
        return consumerGroup;
    }

    private void removeGroup(String groupId) {
        this.groups.remove((Object)groupId);
    }

    private static boolean isSubset(List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions, Map<Uuid, Set<Integer>> target) {
        if (ownedTopicPartitions == null) {
            return false;
        }
        for (ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions : ownedTopicPartitions) {
            Set<Integer> partitions = target.get(topicPartitions.topicId());
            if (partitions == null) {
                return false;
            }
            for (Integer partitionId : topicPartitions.partitions()) {
                if (partitions.contains(partitionId)) continue;
                return false;
            }
        }
        return true;
    }

    private static boolean areOwnedTasksContainedInAssignedTasks(List<StreamsGroupHeartbeatRequestData.TaskIds> ownedTasks, Map<String, Set<Integer>> assignedTasks) {
        if (ownedTasks == null) {
            return false;
        }
        for (StreamsGroupHeartbeatRequestData.TaskIds ownedTasksOfSubtopology : ownedTasks) {
            Set<Integer> partitions = assignedTasks.get(ownedTasksOfSubtopology.subtopologyId());
            if (partitions == null) {
                return false;
            }
            for (Integer partitionId : ownedTasksOfSubtopology.partitions()) {
                if (partitions.contains(partitionId)) continue;
                return false;
            }
        }
        return true;
    }

    private void throwIfConsumerGroupIsFull(ConsumerGroup group, String memberId) throws GroupMaxSizeReachedException {
        if (group.numMembers() >= this.config.consumerGroupMaxSize() && (memberId.isEmpty() || !group.hasMember(memberId))) {
            throw new GroupMaxSizeReachedException("The consumer group has reached its maximum capacity of " + this.config.consumerGroupMaxSize() + " members.");
        }
    }

    private void throwIfShareGroupIsFull(ShareGroup group, String memberId) throws GroupMaxSizeReachedException {
        if (group.numMembers() >= this.config.shareGroupMaxSize() && (memberId.isEmpty() || !group.hasMember(memberId))) {
            throw new GroupMaxSizeReachedException("The share group has reached its maximum capacity of " + this.config.shareGroupMaxSize() + " members.");
        }
    }

    private void throwIfConsumerGroupMemberEpochIsInvalid(ConsumerGroupMember member, int receivedMemberEpoch, List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions) {
        if (receivedMemberEpoch > member.memberEpoch()) {
            throw new FencedMemberEpochException("The consumer group member has a greater member epoch (" + receivedMemberEpoch + ") than the one known by the group coordinator (" + member.memberEpoch() + "). The member must abandon all its partitions and rejoin.");
        }
        if (!(receivedMemberEpoch >= member.memberEpoch() || receivedMemberEpoch == member.previousMemberEpoch() && GroupMetadataManager.isSubset(ownedTopicPartitions, member.assignedPartitions()))) {
            throw new FencedMemberEpochException("The consumer group member has a smaller member epoch (" + receivedMemberEpoch + ") than the one known by the group coordinator (" + member.memberEpoch() + "). The member must abandon all its partitions and rejoin.");
        }
    }

    private void throwIfShareGroupMemberEpochIsInvalid(ShareGroupMember member, int receivedMemberEpoch) {
        if (receivedMemberEpoch > member.memberEpoch()) {
            throw new FencedMemberEpochException("The share group member has a greater member epoch (" + receivedMemberEpoch + ") than the one known by the group coordinator (" + member.memberEpoch() + "). The member must abandon all its partitions and rejoin.");
        }
        if (receivedMemberEpoch < member.memberEpoch() && receivedMemberEpoch != member.previousMemberEpoch()) {
            throw new FencedMemberEpochException("The share group member has a smaller member epoch (" + receivedMemberEpoch + ") than the one known by the group coordinator (" + member.memberEpoch() + "), and it does not match the previous member epoch (" + member.previousMemberEpoch() + "). The member must abandon all its partitions and rejoin.");
        }
    }

    private void throwIfInstanceIdIsUnreleased(ConsumerGroupMember member, String groupId, String receivedMemberId, String receivedInstanceId) {
        if (member.memberEpoch() != -2) {
            this.log.info("[GroupId {}] Static member {} with instance id {} cannot join the group because the instance id is owned by member {}.", new Object[]{groupId, receivedMemberId, receivedInstanceId, member.memberId()});
            throw Errors.UNRELEASED_INSTANCE_ID.exception("Static member " + receivedMemberId + " with instance id " + receivedInstanceId + " cannot join the group because the instance id is owned by " + member.memberId() + " member.");
        }
    }

    private void throwIfInstanceIdIsFenced(ConsumerGroupMember member, String groupId, String receivedMemberId, String receivedInstanceId) {
        if (!member.memberId().equals(receivedMemberId)) {
            this.log.info("[GroupId {}] Static member {} with instance id {} is fenced by existing member {}.", new Object[]{groupId, receivedMemberId, receivedInstanceId, member.memberId()});
            throw Errors.FENCED_INSTANCE_ID.exception("Static member " + receivedMemberId + " with instance id " + receivedInstanceId + " was fenced by member " + member.memberId() + ".");
        }
    }

    private void throwIfStaticMemberIsUnknown(ConsumerGroupMember staticMember, String receivedInstanceId) {
        if (staticMember == null) {
            throw Errors.UNKNOWN_MEMBER_ID.exception("Instance id " + receivedInstanceId + " is unknown.");
        }
    }

    private void throwIfStreamsGroupIsFull(StreamsGroup group) throws GroupMaxSizeReachedException {
        if (group.numMembers() >= this.config.streamsGroupMaxSize()) {
            throw new GroupMaxSizeReachedException("The streams group has reached its maximum capacity of " + this.config.streamsGroupMaxSize() + " members.");
        }
    }

    private static void throwIfStreamsGroupMemberEpochIsInvalid(StreamsGroupMember member, int receivedMemberEpoch, List<StreamsGroupHeartbeatRequestData.TaskIds> ownedActiveTasks, List<StreamsGroupHeartbeatRequestData.TaskIds> ownedStandbyTasks, List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks) {
        if (receivedMemberEpoch > member.memberEpoch()) {
            throw new FencedMemberEpochException("The streams group member has a greater member epoch (" + receivedMemberEpoch + ") than the one known by the group coordinator (" + member.memberEpoch() + "). The member must abandon all its partitions and rejoin.");
        }
        if (!(receivedMemberEpoch >= member.memberEpoch() || receivedMemberEpoch == member.previousMemberEpoch() && GroupMetadataManager.areOwnedTasksContainedInAssignedTasks(ownedActiveTasks, member.assignedTasks().activeTasks()) && GroupMetadataManager.areOwnedTasksContainedInAssignedTasks(ownedStandbyTasks, member.assignedTasks().standbyTasks()) && GroupMetadataManager.areOwnedTasksContainedInAssignedTasks(ownedWarmupTasks, member.assignedTasks().warmupTasks()))) {
            throw new FencedMemberEpochException("The streams group member has a smaller member epoch (" + receivedMemberEpoch + ") than the one known by the group coordinator (" + member.memberEpoch() + "). The member must abandon all its partitions and rejoin.");
        }
    }

    private static void throwIfRequestContainsInvalidTasks(SortedMap<String, ConfiguredSubtopology> subtopologySortedMap, List<StreamsGroupHeartbeatRequestData.TaskIds> tasks) {
        if (tasks == null || tasks.isEmpty()) {
            return;
        }
        for (StreamsGroupHeartbeatRequestData.TaskIds task : tasks) {
            String subtopologyId = task.subtopologyId();
            ConfiguredSubtopology subtopology = (ConfiguredSubtopology)subtopologySortedMap.get(subtopologyId);
            if (subtopology == null) {
                throw new InvalidRequestException("Subtopology " + subtopologyId + " does not exist in the topology.");
            }
            int numTasks = subtopology.numberOfTasks();
            for (Integer partition : task.partitions()) {
                if (partition >= 0 && partition < numTasks) continue;
                throw new InvalidRequestException("Task " + partition + " for subtopology " + subtopologyId + " is invalid. Number of tasks for this subtopology: " + numTasks);
            }
        }
    }

    private void throwIfClassicProtocolIsNotSupported(ConsumerGroup group, String memberId, String protocolType, JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols) {
        if (!group.supportsClassicProtocols(protocolType, ClassicGroupMember.plainProtocolSet(protocols))) {
            throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception("Member " + memberId + "'s protocols are not supported.");
        }
    }

    private void throwIfMemberDoesNotUseClassicProtocol(ConsumerGroupMember member) {
        if (!member.useClassicProtocol()) {
            throw new UnknownMemberIdException(String.format("Member %s does not use the classic protocol.", member.memberId()));
        }
    }

    private void throwIfGenerationIdUnmatched(String memberId, int memberEpoch, int requestGenerationId) {
        if (memberEpoch != requestGenerationId) {
            throw Errors.ILLEGAL_GENERATION.exception(String.format("The request generation id %s is not equal to the member epoch %d of member %s.", requestGenerationId, memberEpoch, memberId));
        }
    }

    private void throwIfClassicProtocolUnmatched(ConsumerGroupMember member, String requestProtocolType, String requestProtocolName) {
        String protocolName = member.supportedClassicProtocols().get().iterator().next().name();
        if (requestProtocolType != null && !"consumer".equals(requestProtocolType)) {
            throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception(String.format("The protocol type %s from member %s request is not equal to the group protocol type %s.", requestProtocolType, member.memberId(), "consumer"));
        }
        if (requestProtocolName != null && !protocolName.equals(requestProtocolName)) {
            throw Errors.INCONSISTENT_GROUP_PROTOCOL.exception(String.format("The protocol name %s from member %s request is not equal to the protocol name %s returned in the join response.", requestProtocolName, member.memberId(), protocolName));
        }
    }

    private void throwIfRebalanceInProgress(ConsumerGroup group, ConsumerGroupMember member) {
        if (group.groupEpoch() > member.memberEpoch() && !member.state().equals((Object)MemberState.UNREVOKED_PARTITIONS)) {
            this.scheduleConsumerGroupJoinTimeoutIfAbsent(group.groupId(), member.memberId(), member.rebalanceTimeoutMs());
            throw Errors.REBALANCE_IN_PROGRESS.exception(String.format("A new rebalance is triggered in group %s and member %s should rejoin to catch up.", group.groupId(), member.memberId()));
        }
    }

    private static ConsumerProtocolSubscription deserializeSubscription(JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols) {
        try {
            return ConsumerProtocol.deserializeConsumerProtocolSubscription((ByteBuffer)ByteBuffer.wrap(((JoinGroupRequestData.JoinGroupRequestProtocol)protocols.iterator().next()).metadata()));
        }
        catch (SchemaException e) {
            throw new IllegalStateException("Malformed embedded consumer protocol in subscription deserialization.");
        }
    }

    private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(String groupId, String memberId, int memberEpoch, String instanceId, String rackId, int rebalanceTimeoutMs, String clientId, String clientHost, StreamsGroupHeartbeatRequestData.Topology topology, List<StreamsGroupHeartbeatRequestData.TaskIds> ownedActiveTasks, List<StreamsGroupHeartbeatRequestData.TaskIds> ownedStandbyTasks, List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks, String processId, StreamsGroupHeartbeatRequestData.Endpoint userEndpoint, List<StreamsGroupHeartbeatRequestData.KeyValue> clientTags, boolean shutdownApplication, int memberEndpointEpoch) throws ApiException {
        int targetAssignmentEpoch;
        TasksTuple targetAssignment;
        ConfiguredTopology updatedConfiguredTopology;
        StreamsGroup group;
        boolean isJoining;
        long currentTimeMs = this.time.milliseconds();
        ArrayList<CoordinatorRecord> records = new ArrayList<CoordinatorRecord>();
        ArrayList<StreamsGroupHeartbeatResponseData.Status> returnedStatus = new ArrayList<StreamsGroupHeartbeatResponseData.Status>();
        boolean bl = isJoining = memberEpoch == 0;
        if (isJoining) {
            group = this.getOrCreateStreamsGroup(groupId);
            this.throwIfStreamsGroupIsFull(group);
        } else {
            group = this.getStreamsGroupOrThrow(groupId);
        }
        if (instanceId != null) {
            throw new UnsupportedOperationException("Static members are not supported yet.");
        }
        StreamsGroupMember member = this.getOrMaybeCreateDynamicStreamsGroupMember(group, memberId, memberEpoch, ownedActiveTasks, ownedStandbyTasks, ownedWarmupTasks, isJoining);
        StreamsGroupMember updatedMember = new StreamsGroupMember.Builder(member).maybeUpdateInstanceId(Optional.empty()).maybeUpdateRackId(Optional.ofNullable(rackId)).maybeUpdateRebalanceTimeoutMs(Utils.ofSentinel(rebalanceTimeoutMs)).maybeUpdateTopologyEpoch(topology != null ? OptionalInt.of(topology.epoch()) : OptionalInt.empty()).setClientId(clientId).setClientHost(clientHost).maybeUpdateProcessId(Optional.ofNullable(processId)).maybeUpdateClientTags(Optional.ofNullable(clientTags).map(x -> x.stream().collect(Collectors.toMap(StreamsGroupHeartbeatRequestData.KeyValue::key, StreamsGroupHeartbeatRequestData.KeyValue::value)))).maybeUpdateUserEndpoint(Optional.ofNullable(userEndpoint).map(x -> new StreamsGroupMemberMetadataValue.Endpoint().setHost(x.host()).setPort(x.port()))).build();
        boolean bumpGroupEpoch = this.hasStreamsMemberMetadataChanged(groupId, member, updatedMember, records);
        StreamsTopology updatedTopology = this.maybeUpdateTopology(groupId, memberId, topology, group, records);
        GroupMetadataManager.maybeSetTopologyStaleStatus(group, updatedMember, returnedStatus);
        boolean reconfigureTopology = group.topology().isEmpty();
        long metadataHash = group.metadataHash();
        if (reconfigureTopology || group.configuredTopology().isEmpty() || group.hasMetadataExpired(currentTimeMs)) {
            metadataHash = group.computeMetadataHash(this.metadataImage, this.topicHashCache, updatedTopology);
            if (metadataHash != group.metadataHash()) {
                this.log.info("[GroupId {}][MemberId {}] Computed new metadata hash: {}.", new Object[]{groupId, memberId, metadataHash});
                bumpGroupEpoch = true;
                reconfigureTopology = true;
            }
            if (reconfigureTopology || group.configuredTopology().isEmpty()) {
                this.log.info("[GroupId {}][MemberId {}] Configuring the topology {}", new Object[]{groupId, memberId, updatedTopology});
                updatedConfiguredTopology = InternalTopicManager.configureTopics(this.logContext, metadataHash, updatedTopology, this.metadataImage.topics());
                group.setConfiguredTopology(updatedConfiguredTopology);
            } else {
                updatedConfiguredTopology = group.configuredTopology().get();
            }
        } else {
            updatedConfiguredTopology = group.configuredTopology().get();
        }
        if (updatedConfiguredTopology.isReady()) {
            SortedMap<String, ConfiguredSubtopology> subtopologySortedMap = updatedConfiguredTopology.subtopologies().get();
            GroupMetadataManager.throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedActiveTasks);
            GroupMetadataManager.throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedStandbyTasks);
            GroupMetadataManager.throwIfRequestContainsInvalidTasks(subtopologySortedMap, ownedWarmupTasks);
        }
        int groupEpoch = group.groupEpoch();
        if (bumpGroupEpoch) {
            records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, ++groupEpoch, metadataHash));
            this.log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {} with metadata hash {}.", new Object[]{groupId, memberId, groupEpoch, metadataHash});
            this.metrics.record("StreamsGroupRebalances");
            group.setMetadataRefreshDeadline(currentTimeMs + Integer.MAX_VALUE, groupEpoch);
        }
        if (groupEpoch > group.assignmentEpoch()) {
            targetAssignment = this.updateStreamsTargetAssignment(group, groupEpoch, updatedMember, updatedConfiguredTopology, this.metadataImage, records);
            targetAssignmentEpoch = groupEpoch;
        } else {
            targetAssignmentEpoch = group.assignmentEpoch();
            targetAssignment = group.targetAssignment(updatedMember.memberId());
        }
        updatedMember = this.maybeReconcile(groupId, updatedMember, group::currentActiveTaskProcessId, group::currentStandbyTaskProcessIds, group::currentWarmupTaskProcessIds, targetAssignmentEpoch, targetAssignment, ownedActiveTasks, ownedStandbyTasks, ownedWarmupTasks, records);
        this.scheduleStreamsGroupSessionTimeout(groupId, memberId);
        if (shutdownApplication) {
            group.setShutdownRequestMemberId(memberId);
        }
        StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData().setMemberId(updatedMember.memberId()).setMemberEpoch(updatedMember.memberEpoch().intValue()).setHeartbeatIntervalMs(this.streamsGroupHeartbeatIntervalMs(groupId));
        if (memberEpoch == 0 || StreamsGroupMember.hasAssignedTasksChanged(member, updatedMember)) {
            response.setActiveTasks(this.createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().activeTasks()));
            response.setStandbyTasks(this.createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
            response.setWarmupTasks(this.createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
            if (memberEpoch != 0 || !updatedMember.assignedTasks().isEmpty()) {
                group.setEndpointInformationEpoch(group.endpointInformationEpoch() + 1);
            }
        }
        if (group.endpointInformationEpoch() != memberEndpointEpoch) {
            response.setPartitionsByUserEndpoint(this.maybeBuildEndpointToPartitions(group, updatedMember));
        }
        response.setEndpointInformationEpoch(group.endpointInformationEpoch());
        Map<String, CreateTopicsRequestData.CreatableTopic> internalTopicsToBeCreated = Collections.emptyMap();
        if (updatedConfiguredTopology.topicConfigurationException().isPresent()) {
            TopicConfigurationException exception = updatedConfiguredTopology.topicConfigurationException().get();
            internalTopicsToBeCreated = updatedConfiguredTopology.internalTopicsToBeCreated();
            returnedStatus.add(new StreamsGroupHeartbeatResponseData.Status().setStatusCode(exception.status().code()).setStatusDetail(exception.getMessage()));
        }
        group.getShutdownRequestMemberId().ifPresent(requestingMemberId -> returnedStatus.add(new StreamsGroupHeartbeatResponseData.Status().setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()).setStatusDetail(String.format("Streams group member %s encountered a fatal error and requested a shutdown for the entire application.", requestingMemberId))));
        if (!returnedStatus.isEmpty()) {
            response.setStatus(returnedStatus);
        }
        return new CoordinatorResult(records, (Object)new StreamsGroupHeartbeatResult(response, internalTopicsToBeCreated));
    }

    private static void maybeSetTopologyStaleStatus(StreamsGroup group, StreamsGroupMember member, List<StreamsGroupHeartbeatResponseData.Status> returnedStatus) {
        if (group.topology().isPresent() && member.topologyEpoch() < group.topology().get().topologyEpoch()) {
            returnedStatus.add(new StreamsGroupHeartbeatResponseData.Status().setStatusCode(StreamsGroupHeartbeatResponse.Status.STALE_TOPOLOGY.code()).setStatusDetail(String.format("The member's topology epoch %d is behind the group's topology epoch %d.", member.topologyEpoch(), group.topology().get().topologyEpoch())));
        }
    }

    private StreamsTopology maybeUpdateTopology(String groupId, String memberId, StreamsGroupHeartbeatRequestData.Topology topology, StreamsGroup group, List<CoordinatorRecord> records) {
        if (topology != null) {
            StreamsTopology streamsTopologyFromRequest = StreamsTopology.fromHeartbeatRequest(topology);
            if (group.topology().isEmpty()) {
                this.log.info("[GroupId {}][MemberId {}] Member initialized the topology with epoch {}", new Object[]{groupId, memberId, topology.epoch()});
                StreamsGroupTopologyValue recordValue = StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord(topology);
                records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, recordValue));
                return streamsTopologyFromRequest;
            }
            if (group.topology().get().topologyEpoch() > topology.epoch()) {
                this.log.info("[GroupId {}][MemberId {}] Member joined with stale topology epoch {}", new Object[]{groupId, memberId, topology.epoch()});
                return group.topology().get();
            }
            if (!group.topology().get().equals(streamsTopologyFromRequest)) {
                throw new InvalidRequestException("Topology updates are not supported yet.");
            }
            this.log.debug("[GroupId {}][MemberId {}] Member joined with currently initialized topology {}", new Object[]{groupId, memberId, topology.epoch()});
            return group.topology().get();
        }
        if (group.topology().isPresent()) {
            return group.topology().get();
        }
        throw new IllegalStateException("The topology is null and the group topology is also null.");
    }

    private List<StreamsGroupHeartbeatResponseData.TaskIds> createStreamsGroupHeartbeatResponseTaskIds(Map<String, Set<Integer>> taskIds) {
        return taskIds.entrySet().stream().map(entry -> new StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId((String)entry.getKey()).setPartitions(((Set)entry.getValue()).stream().sorted().toList())).collect(Collectors.toList());
    }

    private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> maybeBuildEndpointToPartitions(StreamsGroup group, StreamsGroupMember updatedMember) {
        ArrayList<StreamsGroupHeartbeatResponseData.EndpointToPartitions> endpointToPartitionsList = new ArrayList<StreamsGroupHeartbeatResponseData.EndpointToPartitions>();
        Map<String, StreamsGroupMember> members = group.members();
        for (Map.Entry<String, StreamsGroupMember> entry : members.entrySet()) {
            StreamsGroupMember groupMember;
            String memberIdForAssignment = entry.getKey();
            Optional<StreamsGroupMemberMetadataValue.Endpoint> endpointOptional = members.get(memberIdForAssignment).userEndpoint();
            StreamsGroupMember streamsGroupMember = groupMember = updatedMember != null && memberIdForAssignment.equals(updatedMember.memberId()) ? updatedMember : members.get(memberIdForAssignment);
            if (!endpointOptional.isPresent()) continue;
            StreamsGroupMemberMetadataValue.Endpoint endpoint = endpointOptional.get();
            StreamsGroupHeartbeatResponseData.Endpoint responseEndpoint = new StreamsGroupHeartbeatResponseData.Endpoint();
            responseEndpoint.setHost(endpoint.host());
            responseEndpoint.setPort(endpoint.port());
            StreamsGroupHeartbeatResponseData.EndpointToPartitions endpointToPartitions = EndpointToPartitionsManager.endpointToPartitions(groupMember, responseEndpoint, group, this.metadataImage);
            endpointToPartitionsList.add(endpointToPartitions);
        }
        return endpointToPartitionsList.isEmpty() ? null : endpointToPartitionsList;
    }

    private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(AuthorizableRequestContext context, String groupId, String memberId, int memberEpoch, String instanceId, String rackId, int rebalanceTimeoutMs, List<String> subscribedTopicNames, String subscribedTopicRegex, String assignorName, List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions) throws ApiException {
        boolean isFullRequest;
        int targetAssignmentEpoch;
        Assignment targetAssignment;
        long currentTimeMs = this.time.milliseconds();
        ArrayList<CoordinatorRecord> records = new ArrayList<CoordinatorRecord>();
        boolean createIfNotExists = memberEpoch == 0;
        ConsumerGroup group = this.getOrMaybeCreateConsumerGroup(groupId, createIfNotExists, records);
        this.throwIfConsumerGroupIsFull(group, memberId);
        if (memberId.isEmpty()) {
            memberId = Uuid.randomUuid().toString();
        }
        ConsumerGroupMember member = instanceId == null ? this.getOrMaybeSubscribeDynamicConsumerGroupMember(group, memberId, memberEpoch, ownedTopicPartitions, createIfNotExists, false) : this.getOrMaybeSubscribeStaticConsumerGroupMember(group, memberId, memberEpoch, instanceId, ownedTopicPartitions, createIfNotExists, false, records);
        ConsumerGroupMember updatedMember = new ConsumerGroupMember.Builder(member).maybeUpdateInstanceId(Optional.ofNullable(instanceId)).maybeUpdateRackId(Optional.ofNullable(rackId)).maybeUpdateRebalanceTimeoutMs(Utils.ofSentinel(rebalanceTimeoutMs)).maybeUpdateServerAssignorName(Optional.ofNullable(assignorName)).maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames)).maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex)).setClientId(context.clientId()).setClientHost(context.clientAddress().toString()).setClassicMemberMetadata(null).build();
        boolean bumpGroupEpoch = group.groupEpoch() == 0;
        bumpGroupEpoch |= this.hasMemberSubscriptionChanged(groupId, member, updatedMember, records);
        int groupEpoch = group.groupEpoch();
        SubscriptionType subscriptionType = group.subscriptionType();
        if ((bumpGroupEpoch |= this.maybeUpdateRegularExpressions(context, group, member, updatedMember, records)) || group.hasMetadataExpired(currentTimeMs)) {
            UpdateSubscriptionMetadataResult result = this.updateSubscriptionMetadata(group, bumpGroupEpoch, member, updatedMember, records);
            groupEpoch = result.groupEpoch;
            subscriptionType = result.subscriptionType;
        }
        if (groupEpoch > group.assignmentEpoch()) {
            targetAssignment = this.updateTargetAssignment(group, groupEpoch, member, updatedMember, subscriptionType, records);
            targetAssignmentEpoch = groupEpoch;
        } else {
            targetAssignmentEpoch = group.assignmentEpoch();
            targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
        }
        updatedMember = this.maybeReconcile(groupId, updatedMember, group::currentPartitionEpoch, targetAssignmentEpoch, targetAssignment, ownedTopicPartitions, records);
        this.scheduleConsumerGroupSessionTimeout(groupId, memberId);
        ConsumerGroupHeartbeatResponseData response = new ConsumerGroupHeartbeatResponseData().setMemberId(updatedMember.memberId()).setMemberEpoch(updatedMember.memberEpoch()).setHeartbeatIntervalMs(this.consumerGroupHeartbeatIntervalMs(groupId));
        boolean bl = isFullRequest = rebalanceTimeoutMs != -1 && (subscribedTopicNames != null || subscribedTopicRegex != null) && ownedTopicPartitions != null;
        if (memberEpoch == 0 || isFullRequest || ConsumerGroupMember.hasAssignedPartitionsChanged(member, updatedMember)) {
            response.setAssignment(ConsumerGroupHeartbeatResponse.createAssignment(updatedMember.assignedPartitions()));
        }
        return new CoordinatorResult(records, (Object)response);
    }

    private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGroup(ConsumerGroup group, AuthorizableRequestContext context, JoinGroupRequestData request, CompletableFuture<JoinGroupResponseData> responseFuture) throws ApiException {
        boolean downgrade;
        int targetAssignmentEpoch;
        Assignment targetAssignment;
        long currentTimeMs = this.time.milliseconds();
        ArrayList<CoordinatorRecord> records = new ArrayList<CoordinatorRecord>();
        String groupId = request.groupId();
        String instanceId = request.groupInstanceId();
        int sessionTimeoutMs = request.sessionTimeoutMs();
        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = request.protocols();
        String memberId = request.memberId();
        boolean isUnknownMember = memberId.equals("");
        if (isUnknownMember) {
            memberId = Uuid.randomUuid().toString();
        }
        this.throwIfConsumerGroupIsFull(group, memberId);
        this.throwIfClassicProtocolIsNotSupported(group, memberId, request.protocolType(), protocols);
        if (JoinGroupRequest.requiresKnownMemberId((JoinGroupRequestData)request, (int)context.requestVersion())) {
            responseFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.MEMBER_ID_REQUIRED.code()));
            this.log.info("[GroupId {}] Dynamic member with unknown member id joins the consumer group. Created a new member id {} and requesting the member to rejoin with this id.", (Object)groupId, (Object)memberId);
            return EMPTY_RESULT;
        }
        ConsumerGroupMember member = instanceId == null ? this.getOrMaybeSubscribeDynamicConsumerGroupMember(group, memberId, -1, List.of(), true, true) : this.getOrMaybeSubscribeStaticConsumerGroupMember(group, memberId, -1, instanceId, List.of(), isUnknownMember, true, records);
        int groupEpoch = group.groupEpoch();
        SubscriptionType subscriptionType = group.subscriptionType();
        ConsumerProtocolSubscription subscription = GroupMetadataManager.deserializeSubscription(protocols);
        ConsumerGroupMember updatedMember = new ConsumerGroupMember.Builder(member).maybeUpdateInstanceId(Optional.ofNullable(instanceId)).maybeUpdateRackId(Utils.toOptional(subscription.rackId())).maybeUpdateRebalanceTimeoutMs(Utils.ofSentinel(request.rebalanceTimeoutMs())).maybeUpdateServerAssignorName(Optional.empty()).maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics())).setClientId(context.clientId()).setClientHost(context.clientAddress().toString()).setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata().setSessionTimeoutMs(sessionTimeoutMs).setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(protocols))).build();
        boolean bumpGroupEpoch = this.hasMemberSubscriptionChanged(groupId, member, updatedMember, records);
        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
            UpdateSubscriptionMetadataResult result = this.updateSubscriptionMetadata(group, bumpGroupEpoch, member, updatedMember, records);
            groupEpoch = result.groupEpoch;
            subscriptionType = result.subscriptionType;
        }
        if (groupEpoch > group.assignmentEpoch()) {
            targetAssignment = this.updateTargetAssignment(group, groupEpoch, member, updatedMember, subscriptionType, records);
            targetAssignmentEpoch = groupEpoch;
        } else {
            targetAssignmentEpoch = group.assignmentEpoch();
            targetAssignment = group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
        }
        updatedMember = this.maybeReconcile(groupId, updatedMember, group::currentPartitionEpoch, targetAssignmentEpoch, targetAssignment, Utils.toTopicPartitions(subscription.ownedPartitions(), this.metadataImage.topics()), records);
        ConsumerGroupMember existingStaticMemberOrNull = group.staticMember(request.groupInstanceId());
        boolean bl = downgrade = existingStaticMemberOrNull != null && this.validateOnlineDowngradeWithReplacedMember(group, existingStaticMemberOrNull);
        if (downgrade) {
            this.convertToClassicGroup(group, Set.of(), updatedMember, records);
        }
        JoinGroupResponseData response = new JoinGroupResponseData().setMemberId(updatedMember.memberId()).setGenerationId(updatedMember.memberEpoch()).setProtocolType("consumer").setProtocolName(updatedMember.supportedClassicProtocols().get().iterator().next().name());
        CompletableFuture appendFuture = new CompletableFuture();
        appendFuture.whenComplete((__, t) -> {
            if (t == null) {
                this.cancelConsumerGroupJoinTimeout(groupId, response.memberId());
                if (!downgrade) {
                    this.scheduleConsumerGroupSessionTimeout(groupId, response.memberId(), sessionTimeoutMs);
                    this.scheduleConsumerGroupSyncTimeout(groupId, response.memberId(), request.rebalanceTimeoutMs());
                }
                responseFuture.complete(response);
            }
        });
        return new CoordinatorResult(records, null, appendFuture, !downgrade);
    }

    private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> shareGroupHeartbeat(String groupId, String memberId, int memberEpoch, String rackId, String clientId, String clientHost, List<String> subscribedTopicNames) throws ApiException {
        boolean isFullRequest;
        int targetAssignmentEpoch;
        Assignment targetAssignment;
        long currentTimeMs = this.time.milliseconds();
        ArrayList<CoordinatorRecord> records = new ArrayList<CoordinatorRecord>();
        boolean createIfNotExists = memberEpoch == 0;
        ShareGroup group = this.getOrMaybeCreateShareGroup(groupId, createIfNotExists);
        this.throwIfShareGroupIsFull(group, memberId);
        ShareGroupMember member = this.getOrMaybeSubscribeShareGroupMember(group, memberId, memberEpoch, createIfNotExists);
        ShareGroupMember updatedMember = new ShareGroupMember.Builder(member).maybeUpdateRackId(Optional.ofNullable(rackId)).maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames)).setClientId(clientId).setClientHost(clientHost).build();
        boolean bumpGroupEpoch = this.hasMemberSubscriptionChanged(groupId, member, updatedMember, records) || this.initializedAssignmentPending(group);
        int groupEpoch = group.groupEpoch();
        Map<String, SubscriptionCount> subscribedTopicNamesMap = group.subscribedTopicNames();
        SubscriptionType subscriptionType = group.subscriptionType();
        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
            subscribedTopicNamesMap = group.computeSubscribedTopicNames(member, updatedMember);
            long groupMetadataHash = ModernGroup.computeMetadataHash(subscribedTopicNamesMap, this.topicHashCache, this.metadataImage);
            int numMembers = group.numMembers();
            if (!group.hasMember(updatedMember.memberId())) {
                ++numMembers;
            }
            subscriptionType = ModernGroup.subscriptionType(subscribedTopicNamesMap, numMembers);
            if (groupMetadataHash != group.metadataHash()) {
                this.log.info("[GroupId {}] Computed new metadata hash: {}.", (Object)groupId, (Object)groupMetadataHash);
                bumpGroupEpoch = true;
            }
            if (bumpGroupEpoch) {
                records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, ++groupEpoch, groupMetadataHash));
                this.log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", new Object[]{groupId, groupEpoch, groupMetadataHash});
                this.metrics.record("ShareGroupRebalances");
            }
            group.setMetadataRefreshDeadline(currentTimeMs + Integer.MAX_VALUE, groupEpoch);
        }
        if (groupEpoch > group.assignmentEpoch()) {
            targetAssignment = this.updateTargetAssignment(group, groupEpoch, updatedMember, subscriptionType, records);
            targetAssignmentEpoch = groupEpoch;
        } else {
            targetAssignmentEpoch = group.assignmentEpoch();
            targetAssignment = group.targetAssignment(updatedMember.memberId());
        }
        updatedMember = this.maybeReconcile(groupId, updatedMember, targetAssignmentEpoch, targetAssignment, records);
        this.scheduleShareGroupSessionTimeout(groupId, memberId);
        ShareGroupHeartbeatResponseData response = new ShareGroupHeartbeatResponseData().setMemberId(updatedMember.memberId()).setMemberEpoch(updatedMember.memberEpoch()).setHeartbeatIntervalMs(this.shareGroupHeartbeatIntervalMs(groupId));
        boolean bl = isFullRequest = subscribedTopicNames != null;
        if (memberEpoch == 0 || isFullRequest || ConsumerGroupMember.hasAssignedPartitionsChanged(member, updatedMember)) {
            response.setAssignment(ShareGroupHeartbeatResponse.createAssignment(updatedMember.assignedPartitions()));
        }
        return new CoordinatorResult(records, Map.entry(response, this.maybeCreateInitializeShareGroupStateRequest(groupId, groupEpoch, subscribedTopicNamesMap.keySet(), records)));
    }

    private boolean initializedAssignmentPending(ShareGroup group) {
        if (!this.shareGroupStatePartitionMetadata.containsKey((Object)group.groupId())) {
            return false;
        }
        if (group.isEmpty()) {
            return false;
        }
        Map<Uuid, Set<Integer>> initializedTps = GroupMetadataManager.stripInitValue(((ShareGroup.ShareGroupStatePartitionMetadataInfo)this.shareGroupStatePartitionMetadata.get((Object)group.groupId())).initializedTopics());
        HashMap<Uuid, Set> currentAssigned = new HashMap<Uuid, Set>();
        for (Assignment assignment : group.targetAssignment().values()) {
            for (Map.Entry<Uuid, Set<Integer>> tps : assignment.partitions().entrySet()) {
                currentAssigned.computeIfAbsent(tps.getKey(), k -> new HashSet()).addAll((Collection)tps.getValue());
            }
        }
        return !initializedTps.equals(currentAssigned);
    }

    Map<Uuid, ShareGroup.InitMapValue> subscribedTopicsChangeMap(String groupId, Set<String> subscriptionTopicNames) {
        if (subscriptionTopicNames == null || subscriptionTopicNames.isEmpty()) {
            return Map.of();
        }
        HashMap<Uuid, ShareGroup.InitMapValue> topicPartitionChangeMap = new HashMap<Uuid, ShareGroup.InitMapValue>();
        ShareGroup.ShareGroupStatePartitionMetadataInfo info = (ShareGroup.ShareGroupStatePartitionMetadataInfo)this.shareGroupStatePartitionMetadata.get((Object)groupId);
        long curTimestamp = this.time.milliseconds();
        long delta = (long)this.config.offsetCommitTimeoutMs() * 2L;
        HashMap alreadyInitialized = info == null ? new HashMap() : GroupMetadataManager.combineInitMaps(info.initializedTopics(), info.initializingTopics().entrySet().stream().filter(entry -> curTimestamp - ((ShareGroup.InitMapValue)entry.getValue()).timestamp() < delta).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
        subscriptionTopicNames.forEach(topicName -> {
            TopicImage topicImage = this.metadataImage.topics().getTopic(topicName);
            if (topicImage != null) {
                Set alreadyInitializedPartSet;
                Set<Object> set = alreadyInitializedPartSet = alreadyInitialized.containsKey(topicImage.id()) ? ((ShareGroup.InitMapValue)alreadyInitialized.get(topicImage.id())).partitions() : Set.of();
                if (alreadyInitializedPartSet.isEmpty() || alreadyInitializedPartSet.size() < topicImage.partitions().size()) {
                    Set partitionSet = IntStream.range(0, topicImage.partitions().size()).boxed().filter(p -> !alreadyInitializedPartSet.contains(p)).collect(Collectors.toSet());
                    topicPartitionChangeMap.computeIfAbsent(topicImage.id(), k -> new ShareGroup.InitMapValue(topicImage.name(), partitionSet, curTimestamp));
                }
            }
        });
        return topicPartitionChangeMap;
    }

    private Optional<InitializeShareGroupStateParameters> maybeCreateInitializeShareGroupStateRequest(String groupId, int groupEpoch, Set<String> subscriptionTopicNames, List<CoordinatorRecord> records) {
        if (subscriptionTopicNames == null || subscriptionTopicNames.isEmpty() || this.metadataImage.isEmpty()) {
            return Optional.empty();
        }
        Map<Uuid, ShareGroup.InitMapValue> topicPartitionChangeMap = this.subscribedTopicsChangeMap(groupId, subscriptionTopicNames);
        if (topicPartitionChangeMap.isEmpty()) {
            return Optional.empty();
        }
        this.addInitializingTopicsRecords(groupId, records, topicPartitionChangeMap);
        return Optional.of(this.buildInitializeShareGroupStateRequest(groupId, groupEpoch, topicPartitionChangeMap));
    }

    private InitializeShareGroupStateParameters buildInitializeShareGroupStateRequest(String groupId, int groupEpoch, Map<Uuid, ShareGroup.InitMapValue> topicPartitions) {
        return new InitializeShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData(groupId, topicPartitions.entrySet().stream().map(entry -> new TopicData((Uuid)entry.getKey(), ((ShareGroup.InitMapValue)entry.getValue()).partitions().stream().map(partitionId -> PartitionFactory.newPartitionStateData((int)partitionId, (int)groupEpoch, (long)-1L)).toList())).toList())).build();
    }

    private InitializeShareGroupStateParameters buildInitializeShareGroupState(String groupId, int groupEpoch, Map<Uuid, Map<Integer, Long>> offsetByTopicPartitions) {
        return new InitializeShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData(groupId, offsetByTopicPartitions.entrySet().stream().map(entry -> new TopicData((Uuid)entry.getKey(), ((Map)entry.getValue()).entrySet().stream().map(partitionEntry -> PartitionFactory.newPartitionStateData((int)((Integer)partitionEntry.getKey()), (int)groupEpoch, (long)((Long)partitionEntry.getValue()))).toList())).toList())).build();
    }

    void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord> records, Map<Uuid, ShareGroup.InitMapValue> topicPartitionMap) {
        if (topicPartitionMap == null || topicPartitionMap.isEmpty()) {
            return;
        }
        ShareGroup.ShareGroupStatePartitionMetadataInfo currentMap = (ShareGroup.ShareGroupStatePartitionMetadataInfo)this.shareGroupStatePartitionMetadata.get((Object)groupId);
        if (currentMap == null) {
            records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(groupId, topicPartitionMap, Map.of(), Map.of()));
            return;
        }
        Map<Uuid, ShareGroup.InitMapValue> finalInitializingMap = GroupMetadataManager.combineInitMaps(currentMap.initializingTopics(), topicPartitionMap);
        HashSet<Uuid> currentDeleting = new HashSet<Uuid>(currentMap.deletingTopics());
        if (!currentDeleting.isEmpty()) {
            finalInitializingMap.keySet().forEach(key -> {
                if (currentDeleting.remove(key)) {
                    this.log.warn("Initializing topic {} for share group {} found in deleting state as well, removing from deleting.", (Object)this.metadataImage.topics().getTopic(key).name(), (Object)groupId);
                }
            });
        }
        records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(groupId, finalInitializingMap, currentMap.initializedTopics(), this.attachTopicName(currentDeleting)));
    }

    static Map<Uuid, ShareGroup.InitMapValue> combineInitMaps(Map<Uuid, ShareGroup.InitMapValue> initialized, Map<Uuid, ShareGroup.InitMapValue> initializing) {
        HashMap<Uuid, ShareGroup.InitMapValue> finalInitMap = new HashMap<Uuid, ShareGroup.InitMapValue>();
        HashSet<Uuid> combinedTopicIdSet = new HashSet<Uuid>(initialized.keySet());
        Set<Uuid> initializingSet = initializing.keySet();
        combinedTopicIdSet.addAll(initializingSet);
        for (Uuid topicId : combinedTopicIdSet) {
            HashSet initializedPartitions = initialized.containsKey(topicId) ? initialized.get(topicId).partitions() : new HashSet();
            long timestamp = initialized.containsKey(topicId) ? initialized.get(topicId).timestamp() : -1L;
            String name = initialized.containsKey(topicId) ? initialized.get(topicId).name() : "UNKNOWN";
            HashSet<Integer> finalPartitions = new HashSet<Integer>(initializedPartitions);
            if (initializingSet.contains(topicId)) {
                finalPartitions.addAll(initializing.get(topicId).partitions());
                timestamp = initializing.get(topicId).timestamp();
                name = initializing.get(topicId).name();
            }
            finalInitMap.putIfAbsent(topicId, new ShareGroup.InitMapValue(name, finalPartitions, timestamp));
        }
        return finalInitMap;
    }

    static Map<Uuid, Set<Integer>> stripInitValue(Map<Uuid, ShareGroup.InitMapValue> initMap) {
        return initMap.entrySet().stream().map(entry -> Map.entry((Uuid)entry.getKey(), ((ShareGroup.InitMapValue)entry.getValue()).partitions())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    private ConsumerGroupMember getOrMaybeSubscribeDynamicConsumerGroupMember(ConsumerGroup group, String memberId, int memberEpoch, List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions, boolean createIfNotExists, boolean useClassicProtocol) {
        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
        if (!useClassicProtocol) {
            this.throwIfConsumerGroupMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
        }
        if (createIfNotExists) {
            this.log.info("[GroupId {}] Member {} joins the consumer group using the {} protocol.", new Object[]{group.groupId(), memberId, useClassicProtocol ? "classic" : "consumer"});
        }
        return member;
    }

    private StreamsGroupMember getOrMaybeCreateDynamicStreamsGroupMember(StreamsGroup group, String memberId, int memberEpoch, List<StreamsGroupHeartbeatRequestData.TaskIds> ownedActiveTasks, List<StreamsGroupHeartbeatRequestData.TaskIds> ownedStandbyTasks, List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks, boolean memberIsJoining) {
        StreamsGroupMember member = memberIsJoining ? group.getOrCreateDefaultMember(memberId) : group.getMemberOrThrow(memberId);
        GroupMetadataManager.throwIfStreamsGroupMemberEpochIsInvalid(member, memberEpoch, ownedActiveTasks, ownedStandbyTasks, ownedWarmupTasks);
        if (memberIsJoining) {
            this.log.info("[GroupId {}][MemberId {}] Member joins the streams group.", (Object)group.groupId(), (Object)memberId);
        }
        return member;
    }

    private ConsumerGroupMember getOrMaybeSubscribeStaticConsumerGroupMember(ConsumerGroup group, String memberId, int memberEpoch, String instanceId, List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions, boolean createIfNotExists, boolean useClassicProtocol, List<CoordinatorRecord> records) {
        ConsumerGroupMember existingStaticMemberOrNull = group.staticMember(instanceId);
        if (createIfNotExists) {
            if (existingStaticMemberOrNull == null) {
                ConsumerGroupMember newMember = group.getOrMaybeCreateMember(memberId, true);
                this.log.info("[GroupId {}] Static member {} with instance id {} joins the consumer group using the {} protocol.", new Object[]{group.groupId(), memberId, instanceId, useClassicProtocol ? "classic" : "consumer"});
                return newMember;
            }
            if (!useClassicProtocol && !existingStaticMemberOrNull.useClassicProtocol()) {
                this.throwIfInstanceIdIsUnreleased(existingStaticMemberOrNull, group.groupId(), memberId, instanceId);
            }
            ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(existingStaticMemberOrNull, memberId).setMemberEpoch(0).setPreviousMemberEpoch(0).build();
            this.replaceMember(records, group, existingStaticMemberOrNull, newMember);
            this.log.info("[GroupId {}] Static member with instance id {} re-joins the consumer group using the {} protocol. Created a new member {} to replace the existing member {}.", new Object[]{group.groupId(), instanceId, useClassicProtocol ? "classic" : "consumer", memberId, existingStaticMemberOrNull.memberId()});
            return newMember;
        }
        this.throwIfStaticMemberIsUnknown(existingStaticMemberOrNull, instanceId);
        this.throwIfInstanceIdIsFenced(existingStaticMemberOrNull, group.groupId(), memberId, instanceId);
        if (!useClassicProtocol) {
            this.throwIfConsumerGroupMemberEpochIsInvalid(existingStaticMemberOrNull, memberEpoch, ownedTopicPartitions);
        }
        return existingStaticMemberOrNull;
    }

    private ShareGroupMember getOrMaybeSubscribeShareGroupMember(ShareGroup group, String memberId, int memberEpoch, boolean createIfNotExists) {
        ShareGroupMember member = group.getOrMaybeCreateMember(memberId, createIfNotExists);
        this.throwIfShareGroupMemberEpochIsInvalid(member, memberEpoch);
        if (createIfNotExists) {
            this.log.info("[GroupId {}] Member {} joins the share group using the share protocol.", (Object)group.groupId(), (Object)memberId);
        }
        return member;
    }

    private boolean hasMemberSubscriptionChanged(String groupId, ConsumerGroupMember member, ConsumerGroupMember updatedMember, List<CoordinatorRecord> records) throws InvalidRegularExpression {
        String memberId = updatedMember.memberId();
        if (!updatedMember.equals(member)) {
            records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, updatedMember));
            if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
                this.log.debug("[GroupId {}] Member {} updated its subscribed topics to: {}.", new Object[]{groupId, memberId, updatedMember.subscribedTopicNames()});
                return true;
            }
        }
        return false;
    }

    private static boolean isNotEmpty(String value) {
        return value != null && !value.isEmpty();
    }

    private boolean maybeUpdateRegularExpressions(AuthorizableRequestContext context, ConsumerGroup group, ConsumerGroupMember member, ConsumerGroupMember updatedMember, List<CoordinatorRecord> records) {
        long currentTimeMs = this.time.milliseconds();
        String groupId = group.groupId();
        String memberId = updatedMember.memberId();
        String oldSubscribedTopicRegex = member.subscribedTopicRegex();
        String newSubscribedTopicRegex = updatedMember.subscribedTopicRegex();
        boolean bumpGroupEpoch = false;
        boolean requireRefresh = false;
        if (!Objects.equals(oldSubscribedTopicRegex, newSubscribedTopicRegex)) {
            this.log.debug("[GroupId {}] Member {} updated its subscribed regex to: {}.", new Object[]{groupId, memberId, newSubscribedTopicRegex});
            if (GroupMetadataManager.isNotEmpty(oldSubscribedTopicRegex) && group.numSubscribedMembers(oldSubscribedTopicRegex) == 1) {
                records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, oldSubscribedTopicRegex));
            }
            if (GroupMetadataManager.isNotEmpty(newSubscribedTopicRegex)) {
                if (group.numSubscribedMembers(newSubscribedTopicRegex) == 0) {
                    Utils.throwIfRegularExpressionIsInvalid(updatedMember.subscribedTopicRegex());
                    requireRefresh = true;
                } else {
                    bumpGroupEpoch = group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent();
                }
            }
        }
        if (!requireRefresh && group.subscribedRegularExpressions().isEmpty()) {
            return bumpGroupEpoch;
        }
        String key = group.groupId() + "-regex";
        if (this.executor.isScheduled(key)) {
            return bumpGroupEpoch;
        }
        long lastRefreshTimeMs = group.lastResolvedRegularExpressionRefreshTimeMs();
        if (currentTimeMs <= lastRefreshTimeMs + 10000L) {
            return bumpGroupEpoch;
        }
        HashMap<String, Integer> subscribedRegularExpressions = new HashMap<String, Integer>(group.subscribedRegularExpressions());
        if (GroupMetadataManager.isNotEmpty(oldSubscribedTopicRegex)) {
            subscribedRegularExpressions.compute(oldSubscribedTopicRegex, Utils::decValue);
        }
        if (GroupMetadataManager.isNotEmpty(newSubscribedTopicRegex)) {
            subscribedRegularExpressions.compute(newSubscribedTopicRegex, Utils::incValue);
        }
        requireRefresh |= subscribedRegularExpressions.size() != group.numResolvedRegularExpressions();
        requireRefresh |= group.lastResolvedRegularExpressionVersion() < this.lastMetadataImageWithNewTopics;
        if ((requireRefresh |= currentTimeMs > lastRefreshTimeMs + (long)this.config.consumerGroupRegexRefreshIntervalMs()) && !subscribedRegularExpressions.isEmpty()) {
            Set regexes = Collections.unmodifiableSet(subscribedRegularExpressions.keySet());
            this.executor.schedule(key, () -> GroupMetadataManager.refreshRegularExpressions(context, groupId, this.log, this.time, this.metadataImage, this.authorizerPlugin, regexes), (result, exception) -> this.handleRegularExpressionsResult(groupId, memberId, (Map<String, ResolvedRegularExpression>)result, exception));
        }
        return bumpGroupEpoch;
    }

    public static Map<String, ResolvedRegularExpression> refreshRegularExpressions(AuthorizableRequestContext context, String groupId, Logger log, Time time, MetadataImage image, Optional<Plugin<Authorizer>> authorizerPlugin, Set<String> regexes) {
        long startTimeMs = time.milliseconds();
        log.debug("[GroupId {}] Refreshing regular expressions: {}", (Object)groupId, regexes);
        HashMap<String, Set<String>> resolvedRegexes = new HashMap<String, Set<String>>(regexes.size());
        ArrayList<Pattern> compiledRegexes = new ArrayList<Pattern>(regexes.size());
        for (String regex : regexes) {
            resolvedRegexes.put(regex, new HashSet());
            try {
                compiledRegexes.add(Pattern.compile((String)regex));
            }
            catch (PatternSyntaxException ex) {
                log.error("[GroupId {}] Couldn't parse regular expression '{}' due to `{}`. Ignoring it.", new Object[]{groupId, regex, ex.getDescription()});
            }
        }
        for (String topicName : image.topics().topicsByName().keySet()) {
            for (Pattern regex : compiledRegexes) {
                if (!regex.matcher((CharSequence)topicName).matches()) continue;
                ((Set)resolvedRegexes.get(regex.pattern())).add(topicName);
            }
        }
        GroupMetadataManager.filterTopicDescribeAuthorizedTopics(context, authorizerPlugin, resolvedRegexes);
        long version = image.provenance().lastContainedOffset();
        HashMap<String, ResolvedRegularExpression> result = new HashMap<String, ResolvedRegularExpression>(resolvedRegexes.size());
        for (Map.Entry resolvedRegex : resolvedRegexes.entrySet()) {
            result.put((String)resolvedRegex.getKey(), new ResolvedRegularExpression((Set)resolvedRegex.getValue(), version, startTimeMs));
        }
        log.info("[GroupId {}] Scanned {} topics to refresh regular expressions {} in {}ms.", new Object[]{groupId, image.topics().topicsByName().size(), resolvedRegexes.keySet(), time.milliseconds() - startTimeMs});
        return result;
    }

    private static void filterTopicDescribeAuthorizedTopics(AuthorizableRequestContext context, Optional<Plugin<Authorizer>> authorizerPlugin, Map<String, Set<String>> resolvedRegexes) {
        if (authorizerPlugin.isEmpty()) {
            return;
        }
        HashMap topicNameCount = new HashMap();
        resolvedRegexes.values().forEach(topicNames -> topicNames.forEach(topicName -> topicNameCount.compute(topicName, Utils::incValue)));
        List actions = topicNameCount.entrySet().stream().map(entry -> {
            ResourcePattern resource = new ResourcePattern(ResourceType.TOPIC, (String)entry.getKey(), PatternType.LITERAL);
            return new Action(AclOperation.DESCRIBE, resource, ((Integer)entry.getValue()).intValue(), true, false);
        }).collect(Collectors.toList());
        List authorizationResults = ((Authorizer)authorizerPlugin.get().get()).authorize(context, actions);
        HashSet deniedTopics = new HashSet();
        IntStream.range(0, actions.size()).forEach(i -> {
            if (authorizationResults.get(i) == AuthorizationResult.DENIED) {
                String deniedTopic = ((Action)actions.get(i)).resourcePattern().name();
                deniedTopics.add(deniedTopic);
            }
        });
        resolvedRegexes.forEach((__, topicNames) -> topicNames.removeAll(deniedTopics));
    }

    private CoordinatorResult<Void, CoordinatorRecord> handleRegularExpressionsResult(String groupId, String memberId, Map<String, ResolvedRegularExpression> resolvedRegularExpressions, Throwable exception) {
        if (exception != null) {
            this.log.error("[GroupId {}] Couldn't update regular expression due to: {}", (Object)groupId, (Object)exception.getMessage());
            return new CoordinatorResult(List.of());
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("[GroupId {}] Received updated regular expressions based on the context of member {}: {}.", new Object[]{groupId, memberId, resolvedRegularExpressions});
        }
        ArrayList<CoordinatorRecord> records = new ArrayList<CoordinatorRecord>();
        try {
            ConsumerGroup group = this.consumerGroup(groupId);
            HashMap<String, SubscriptionCount> subscribedTopicNames = new HashMap<String, SubscriptionCount>(group.subscribedTopicNames());
            boolean bumpGroupEpoch = false;
            for (Map.Entry<String, ResolvedRegularExpression> entry : resolvedRegularExpressions.entrySet()) {
                String regex = entry.getKey();
                if (group.numSubscribedMembers(regex) == 0) continue;
                ResolvedRegularExpression newResolvedRegularExpression = entry.getValue();
                ResolvedRegularExpression oldResolvedRegularExpression = group.resolvedRegularExpression(regex).orElse(ResolvedRegularExpression.EMPTY);
                if (!oldResolvedRegularExpression.topics.equals(newResolvedRegularExpression.topics)) {
                    bumpGroupEpoch = true;
                    oldResolvedRegularExpression.topics.forEach(topicName -> subscribedTopicNames.compute((String)topicName, SubscriptionCount::decRegexCount));
                    newResolvedRegularExpression.topics.forEach(topicName -> subscribedTopicNames.compute((String)topicName, SubscriptionCount::incRegexCount));
                }
                records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionRecord(groupId, regex, newResolvedRegularExpression));
            }
            long groupMetadataHash = ModernGroup.computeMetadataHash(subscribedTopicNames, this.topicHashCache, this.metadataImage);
            if (groupMetadataHash != group.metadataHash()) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("[GroupId {}] Computed new metadata hash: {}.", (Object)groupId, (Object)groupMetadataHash);
                }
                bumpGroupEpoch = true;
            }
            if (bumpGroupEpoch) {
                int groupEpoch = group.groupEpoch() + 1;
                records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, groupEpoch, groupMetadataHash));
                this.log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", new Object[]{groupId, groupEpoch, groupMetadataHash});
                this.metrics.record("ConsumerGroupRebalances");
                group.setMetadataRefreshDeadline(this.time.milliseconds() + Integer.MAX_VALUE, groupEpoch);
            }
        }
        catch (GroupIdNotFoundException ex) {
            this.log.debug("[GroupId {}] Received result of regular expression resolution but it no longer exists.", (Object)groupId);
        }
        return new CoordinatorResult(records);
    }

    private boolean hasMemberSubscriptionChanged(String groupId, ShareGroupMember member, ShareGroupMember updatedMember, List<CoordinatorRecord> records) {
        String memberId = updatedMember.memberId();
        if (!updatedMember.equals(member)) {
            records.add(GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord(groupId, updatedMember));
            if (!updatedMember.subscribedTopicNames().equals(member.subscribedTopicNames())) {
                this.log.info("[GroupId {}] Member {} updated its subscribed topics to: {}.", new Object[]{groupId, memberId, updatedMember.subscribedTopicNames()});
                return true;
            }
        }
        return false;
    }

    private boolean hasStreamsMemberMetadataChanged(String groupId, StreamsGroupMember member, StreamsGroupMember updatedMember, List<CoordinatorRecord> records) {
        String memberId = updatedMember.memberId();
        if (!updatedMember.equals(member)) {
            records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, updatedMember));
            this.log.info("[GroupId {}] Member {} updated its member metdata to {}.", new Object[]{groupId, memberId, updatedMember});
            return true;
        }
        return false;
    }

    private ConsumerGroupMember maybeReconcile(String groupId, ConsumerGroupMember member, BiFunction<Uuid, Integer, Integer> currentPartitionEpoch, int targetAssignmentEpoch, Assignment targetAssignment, List<ConsumerGroupHeartbeatRequestData.TopicPartitions> ownedTopicPartitions, List<CoordinatorRecord> records) {
        if (member.isReconciledTo(targetAssignmentEpoch)) {
            return member;
        }
        ConsumerGroupMember updatedMember = new org.apache.kafka.coordinator.group.modern.consumer.CurrentAssignmentBuilder(member).withTargetAssignment(targetAssignmentEpoch, targetAssignment).withCurrentPartitionEpoch(currentPartitionEpoch).withOwnedTopicPartitions(ownedTopicPartitions).build();
        if (!updatedMember.equals(member)) {
            records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, updatedMember));
            if (this.log.isDebugEnabled()) {
                this.log.debug("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, assignedPartitions={} and revokedPartitions={}.", new Object[]{groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), Utils.assignmentToString(updatedMember.assignedPartitions()), Utils.assignmentToString(updatedMember.partitionsPendingRevocation())});
            }
            if (!updatedMember.useClassicProtocol()) {
                if (updatedMember.state() == MemberState.UNREVOKED_PARTITIONS) {
                    this.scheduleConsumerGroupRebalanceTimeout(groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.rebalanceTimeoutMs());
                } else {
                    this.cancelGroupRebalanceTimeout(groupId, updatedMember.memberId());
                }
            }
        }
        return updatedMember;
    }

    private ShareGroupMember maybeReconcile(String groupId, ShareGroupMember member, int targetAssignmentEpoch, Assignment targetAssignment, List<CoordinatorRecord> records) {
        if (member.isReconciledTo(targetAssignmentEpoch)) {
            return member;
        }
        ShareGroupMember updatedMember = new ShareGroupAssignmentBuilder(member).withTargetAssignment(targetAssignmentEpoch, targetAssignment).build();
        if (!updatedMember.equals(member)) {
            records.add(GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentRecord(groupId, updatedMember));
            this.log.info("[GroupId {}] Member {} new assignment state: epoch={}, previousEpoch={}, state={}, assignedPartitions={}.", new Object[]{groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), Utils.assignmentToString(updatedMember.assignedPartitions())});
        }
        return updatedMember;
    }

    private StreamsGroupMember maybeReconcile(String groupId, StreamsGroupMember member, BiFunction<String, Integer, String> currentActiveTaskProcessId, BiFunction<String, Integer, Set<String>> currentStandbyTaskProcessIds, BiFunction<String, Integer, Set<String>> currentWarmupTaskProcessIds, int targetAssignmentEpoch, TasksTuple targetAssignment, List<StreamsGroupHeartbeatRequestData.TaskIds> ownedActiveTasks, List<StreamsGroupHeartbeatRequestData.TaskIds> ownedStandbyTasks, List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks, List<CoordinatorRecord> records) {
        StreamsGroupMember updatedMember;
        if (member.isReconciledTo(targetAssignmentEpoch)) {
            return member;
        }
        TasksTuple ownedTasks = null;
        if (ownedActiveTasks != null && ownedStandbyTasks != null && ownedWarmupTasks != null) {
            ownedTasks = TasksTuple.fromHeartbeatRequest(ownedActiveTasks, ownedStandbyTasks, ownedWarmupTasks);
        }
        if (!(updatedMember = new CurrentAssignmentBuilder(member).withTargetAssignment(targetAssignmentEpoch, targetAssignment).withCurrentActiveTaskProcessId(currentActiveTaskProcessId).withCurrentStandbyTaskProcessIds(currentStandbyTaskProcessIds).withCurrentWarmupTaskProcessIds(currentWarmupTaskProcessIds).withOwnedAssignment(ownedTasks).build()).equals(member)) {
            records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, updatedMember));
            this.log.info("[GroupId {}][MemberId {}] Member's new assignment state: epoch={}, previousEpoch={}, state={}, assignedTasks={} and tasksPendingRevocation={}.", new Object[]{groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.previousMemberEpoch(), updatedMember.state(), updatedMember.assignedTasks().toString(), updatedMember.tasksPendingRevocation().toString()});
            if (updatedMember.state() == org.apache.kafka.coordinator.group.streams.MemberState.UNREVOKED_TASKS) {
                this.scheduleStreamsGroupRebalanceTimeout(groupId, updatedMember.memberId(), updatedMember.memberEpoch(), updatedMember.rebalanceTimeoutMs());
            } else {
                this.cancelGroupRebalanceTimeout(groupId, updatedMember.memberId());
            }
        }
        return updatedMember;
    }

    private UpdateSubscriptionMetadataResult updateSubscriptionMetadata(ConsumerGroup group, boolean bumpGroupEpoch, ConsumerGroupMember member, ConsumerGroupMember updatedMember, List<CoordinatorRecord> records) {
        long currentTimeMs = this.time.milliseconds();
        String groupId = group.groupId();
        int groupEpoch = group.groupEpoch();
        Map<String, Integer> subscribedRegularExpressions = group.computeSubscribedRegularExpressions(member, updatedMember);
        Map<String, SubscriptionCount> subscribedTopicNamesMap = group.computeSubscribedTopicNames(member, updatedMember);
        long groupMetadataHash = ModernGroup.computeMetadataHash(subscribedTopicNamesMap, this.topicHashCache, this.metadataImage);
        int numMembers = group.numMembers();
        if (!group.hasMember(updatedMember.memberId()) && !group.hasStaticMember(updatedMember.instanceId())) {
            ++numMembers;
        }
        SubscriptionType subscriptionType = ConsumerGroup.subscriptionType(subscribedRegularExpressions, subscribedTopicNamesMap, numMembers);
        if (groupMetadataHash != group.metadataHash()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("[GroupId {}] Computed new metadata hash: {}.", (Object)groupId, (Object)groupMetadataHash);
            }
            bumpGroupEpoch = true;
        }
        if (bumpGroupEpoch) {
            records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, ++groupEpoch, groupMetadataHash));
            this.log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", new Object[]{groupId, groupEpoch, groupMetadataHash});
            this.metrics.record("ConsumerGroupRebalances");
        }
        group.setMetadataRefreshDeadline(currentTimeMs + Integer.MAX_VALUE, groupEpoch);
        if (group.hasSubscriptionMetadataRecord()) {
            records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId));
        }
        return new UpdateSubscriptionMetadataResult(groupEpoch, subscriptionType);
    }

    private Assignment updateTargetAssignment(ConsumerGroup group, int groupEpoch, ConsumerGroupMember member, ConsumerGroupMember updatedMember, SubscriptionType subscriptionType, List<CoordinatorRecord> records) {
        String preferredServerAssignor = group.computePreferredServerAssignor(member, updatedMember).orElse(this.defaultConsumerGroupAssignor.name());
        try {
            TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder assignmentResultBuilder = (TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder)((TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder)((TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder)((TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder)((TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder)((TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder)((TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder)new TargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(group.groupId(), groupEpoch, (PartitionAssignor)this.consumerGroupAssignors.get(preferredServerAssignor)).withMembers(group.members())).withStaticMembers(group.staticMembers())).withSubscriptionType(subscriptionType)).withTargetAssignment(group.targetAssignment())).withInvertedTargetAssignment(group.invertedTargetAssignment())).withMetadataImage(this.metadataImage)).withResolvedRegularExpressions(group.resolvedRegularExpressions()).addOrUpdateMember(updatedMember.memberId(), updatedMember);
            String previousMemberId = group.staticMemberId(updatedMember.instanceId());
            if (previousMemberId != null && !updatedMember.memberId().equals(previousMemberId)) {
                assignmentResultBuilder.removeMember(previousMemberId);
            }
            long startTimeMs = this.time.milliseconds();
            TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = assignmentResultBuilder.build();
            long assignorTimeMs = this.time.milliseconds() - startTimeMs;
            if (this.log.isDebugEnabled()) {
                this.log.debug("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor in {}ms: {}.", new Object[]{group.groupId(), groupEpoch, preferredServerAssignor, assignorTimeMs, assignmentResult.targetAssignment()});
            } else {
                this.log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor in {}ms.", new Object[]{group.groupId(), groupEpoch, preferredServerAssignor, assignorTimeMs});
            }
            records.addAll(assignmentResult.records());
            MemberAssignment newMemberAssignment = assignmentResult.targetAssignment().get(updatedMember.memberId());
            if (newMemberAssignment != null) {
                return new Assignment(newMemberAssignment.partitions());
            }
            return Assignment.EMPTY;
        }
        catch (PartitionAssignorException ex) {
            String msg = String.format("Failed to compute a new target assignment for epoch %d: %s", groupEpoch, ex.getMessage());
            this.log.error("[GroupId {}] {}.", (Object)group.groupId(), (Object)msg);
            throw new UnknownServerException(msg, (Throwable)ex);
        }
    }

    private Assignment updateTargetAssignment(ShareGroup group, int groupEpoch, ShareGroupMember updatedMember, SubscriptionType subscriptionType, List<CoordinatorRecord> records) {
        try {
            Map<Uuid, Set<Integer>> initializedTopicPartitions = this.shareGroupStatePartitionMetadata.containsKey((Object)group.groupId()) ? GroupMetadataManager.stripInitValue(((ShareGroup.ShareGroupStatePartitionMetadataInfo)this.shareGroupStatePartitionMetadata.get((Object)group.groupId())).initializedTopics()) : Map.of();
            TargetAssignmentBuilder.ShareTargetAssignmentBuilder assignmentResultBuilder = (TargetAssignmentBuilder.ShareTargetAssignmentBuilder)((TargetAssignmentBuilder.ShareTargetAssignmentBuilder)((TargetAssignmentBuilder.ShareTargetAssignmentBuilder)((TargetAssignmentBuilder.ShareTargetAssignmentBuilder)((TargetAssignmentBuilder.ShareTargetAssignmentBuilder)((TargetAssignmentBuilder.ShareTargetAssignmentBuilder)((TargetAssignmentBuilder.ShareTargetAssignmentBuilder)new TargetAssignmentBuilder.ShareTargetAssignmentBuilder(group.groupId(), groupEpoch, (PartitionAssignor)this.shareGroupAssignor).withMembers(group.members())).withSubscriptionType(subscriptionType)).withTargetAssignment(group.targetAssignment())).withTopicAssignablePartitionsMap(initializedTopicPartitions)).withInvertedTargetAssignment(group.invertedTargetAssignment())).withMetadataImage(this.metadataImage)).addOrUpdateMember(updatedMember.memberId(), updatedMember);
            long startTimeMs = this.time.milliseconds();
            TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = assignmentResultBuilder.build();
            long assignorTimeMs = this.time.milliseconds() - startTimeMs;
            if (this.log.isDebugEnabled()) {
                this.log.debug("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor in {}ms: {}.", new Object[]{group.groupId(), groupEpoch, this.shareGroupAssignor, assignorTimeMs, assignmentResult.targetAssignment()});
            } else {
                this.log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor in {}ms.", new Object[]{group.groupId(), groupEpoch, this.shareGroupAssignor, assignorTimeMs});
            }
            records.addAll(assignmentResult.records());
            MemberAssignment newMemberAssignment = assignmentResult.targetAssignment().get(updatedMember.memberId());
            if (newMemberAssignment != null) {
                return new Assignment(newMemberAssignment.partitions());
            }
            return Assignment.EMPTY;
        }
        catch (PartitionAssignorException ex) {
            String msg = String.format("Failed to compute a new target assignment for epoch %d: %s", groupEpoch, ex.getMessage());
            this.log.error("[GroupId {}] {}.", (Object)group.groupId(), (Object)msg);
            throw new UnknownServerException(msg, (Throwable)ex);
        }
    }

    private TasksTuple updateStreamsTargetAssignment(StreamsGroup group, int groupEpoch, StreamsGroupMember updatedMember, ConfiguredTopology configuredTopology, MetadataImage metadataImage, List<CoordinatorRecord> records) {
        TaskAssignor assignor = this.streamsGroupAssignor(group.groupId());
        Map<String, String> assignmentConfigs = this.streamsGroupAssignmentConfigs(group.groupId());
        try {
            TargetAssignmentBuilder assignmentResultBuilder = new TargetAssignmentBuilder(group.groupId(), groupEpoch, assignor, assignmentConfigs).withMembers(group.members()).withTopology(configuredTopology).withStaticMembers(group.staticMembers()).withMetadataImage(metadataImage).withTargetAssignment(group.targetAssignment()).addOrUpdateMember(updatedMember.memberId(), updatedMember);
            long startTimeMs = this.time.milliseconds();
            TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = assignmentResultBuilder.build();
            long assignorTimeMs = this.time.milliseconds() - startTimeMs;
            if (this.log.isDebugEnabled()) {
                this.log.debug("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor in {}ms: {}.", new Object[]{group.groupId(), groupEpoch, assignor, assignorTimeMs, assignmentResult.targetAssignment()});
            } else {
                this.log.info("[GroupId {}] Computed a new target assignment for epoch {} with '{}' assignor in {}ms.", new Object[]{group.groupId(), groupEpoch, assignor, assignorTimeMs});
            }
            records.addAll(assignmentResult.records());
            return assignmentResult.targetAssignment().get(updatedMember.memberId());
        }
        catch (TaskAssignorException ex) {
            String msg = String.format("Failed to compute a new target assignment for epoch %d: %s", groupEpoch, ex.getMessage());
            this.log.error("[GroupId {}] {}.", (Object)group.groupId(), (Object)msg);
            throw new UnknownServerException(msg, (Throwable)((Object)ex));
        }
    }

    private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupLeave(String groupId, String instanceId, String memberId, int memberEpoch) throws ApiException {
        ConsumerGroup group = this.consumerGroup(groupId);
        ConsumerGroupHeartbeatResponseData response = new ConsumerGroupHeartbeatResponseData().setMemberId(memberId).setMemberEpoch(memberEpoch);
        if (instanceId == null) {
            ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);
            this.log.info("[GroupId {}] Member {} left the consumer group.", (Object)groupId, (Object)memberId);
            return this.consumerGroupFenceMember(group, member, response);
        }
        ConsumerGroupMember member = group.staticMember(instanceId);
        this.throwIfStaticMemberIsUnknown(member, instanceId);
        this.throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
        if (memberEpoch == -2) {
            this.log.info("[GroupId {}] Static member {} with instance id {} temporarily left the consumer group.", new Object[]{group.groupId(), memberId, instanceId});
            return this.consumerGroupStaticMemberGroupLeave(group, member);
        }
        this.log.info("[GroupId {}] Static member {} with instance id {} left the consumer group.", new Object[]{group.groupId(), memberId, instanceId});
        return this.consumerGroupFenceMember(group, member, response);
    }

    private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupLeave(String groupId, String instanceId, String memberId, int memberEpoch, boolean shutdownApplication) throws ApiException {
        StreamsGroup group = this.streamsGroup(groupId);
        if (shutdownApplication) {
            group.setShutdownRequestMemberId(memberId);
        }
        StreamsGroupHeartbeatResponseData response = new StreamsGroupHeartbeatResponseData().setMemberId(memberId).setMemberEpoch(memberEpoch);
        if (instanceId == null) {
            StreamsGroupMember member = group.getMemberOrThrow(memberId);
            this.log.info("[GroupId {}][MemberId {}] Member {} left the streams group.", new Object[]{groupId, memberId, memberId});
            return this.streamsGroupFenceMember(group, member, new StreamsGroupHeartbeatResult(response, Map.of()));
        }
        throw new UnsupportedOperationException("Static members are not supported in streams groups.");
    }

    private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupStaticMemberGroupLeave(ConsumerGroup group, ConsumerGroupMember member) {
        ConsumerGroupMember leavingStaticMember = new ConsumerGroupMember.Builder(member).setMemberEpoch(-2).setPartitionsPendingRevocation(Map.of()).build();
        return new CoordinatorResult(List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(group.groupId(), leavingStaticMember)), (Object)new ConsumerGroupHeartbeatResponseData().setMemberId(member.memberId()).setMemberEpoch(-2));
    }

    private CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> shareGroupLeave(String groupId, String memberId, int memberEpoch) throws ApiException {
        ShareGroup group = this.shareGroup(groupId);
        ShareGroupHeartbeatResponseData response = new ShareGroupHeartbeatResponseData().setMemberId(memberId).setMemberEpoch(memberEpoch);
        ShareGroupMember member = group.getOrMaybeCreateMember(memberId, false);
        this.log.info("[GroupId {}] Member {} left the share group.", (Object)groupId, (Object)memberId);
        return this.shareGroupFenceMember(group, member, response);
    }

    private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupFenceMember(ConsumerGroup group, ConsumerGroupMember member, T response) {
        return this.consumerGroupFenceMembers(group, Set.of(member), response);
    }

    private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupFenceMembers(ConsumerGroup group, Set<ConsumerGroupMember> members, T response) {
        if (members.isEmpty()) {
            return new CoordinatorResult(List.of(), response);
        }
        ArrayList<CoordinatorRecord> records = new ArrayList<CoordinatorRecord>();
        if (this.validateOnlineDowngradeWithFencedMembers(group, members)) {
            this.convertToClassicGroup(group, members, null, records);
            return new CoordinatorResult(records, response, null, false);
        }
        for (ConsumerGroupMember member : members) {
            this.removeMember(records, group.groupId(), member.memberId());
        }
        Set<String> deletedRegexes = this.maybeDeleteResolvedRegularExpressions(records, group, members);
        Map<String, SubscriptionCount> subscribedTopicNamesMap = group.computeSubscribedTopicNamesWithoutDeletedMembers(members, deletedRegexes);
        long groupMetadataHash = ModernGroup.computeMetadataHash(subscribedTopicNamesMap, this.topicHashCache, this.metadataImage);
        if (groupMetadataHash != group.metadataHash() && this.log.isDebugEnabled()) {
            this.log.debug("[GroupId {}] Computed new metadata hash: {}.", (Object)group.groupId(), (Object)groupMetadataHash);
        }
        int groupEpoch = group.groupEpoch() + 1;
        records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(group.groupId(), groupEpoch, groupMetadataHash));
        this.log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", new Object[]{group.groupId(), groupEpoch, groupMetadataHash});
        for (ConsumerGroupMember member : members) {
            this.cancelTimers(group.groupId(), member.memberId());
        }
        return new CoordinatorResult(records, response);
    }

    private <T> CoordinatorResult<T, CoordinatorRecord> shareGroupFenceMember(ShareGroup group, ShareGroupMember member, T response) {
        ArrayList<CoordinatorRecord> records = new ArrayList<CoordinatorRecord>();
        records.add(GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord(group.groupId(), member.memberId()));
        records.add(GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(group.groupId(), member.memberId()));
        records.add(GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(group.groupId(), member.memberId()));
        long groupMetadataHash = ModernGroup.computeMetadataHash(group.computeSubscribedTopicNames(member, null), this.topicHashCache, this.metadataImage);
        if (groupMetadataHash != group.metadataHash()) {
            this.log.info("[GroupId {}] Computed new metadata hash: {}.", (Object)group.groupId(), (Object)groupMetadataHash);
        }
        int groupEpoch = group.groupEpoch() + 1;
        records.add(GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(group.groupId(), groupEpoch, groupMetadataHash));
        this.cancelGroupSessionTimeout(group.groupId(), member.memberId());
        return new CoordinatorResult(records, response);
    }

    private void replaceMember(List<CoordinatorRecord> records, ConsumerGroup group, ConsumerGroupMember oldMember, ConsumerGroupMember newMember) {
        String groupId = group.groupId();
        this.removeMember(records, groupId, oldMember.memberId());
        records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, newMember));
        records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMember.memberId(), group.targetAssignment(oldMember.memberId()).partitions()));
        records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, newMember));
    }

    private <T> CoordinatorResult<T, CoordinatorRecord> streamsGroupFenceMember(StreamsGroup group, StreamsGroupMember member, T response) {
        ArrayList<CoordinatorRecord> records = new ArrayList<CoordinatorRecord>();
        records.addAll(this.removeStreamsMember(group.groupId(), member.memberId()));
        int groupEpoch = group.groupEpoch() + 1;
        records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(group.groupId(), groupEpoch, 0L));
        this.cancelTimers(group.groupId(), member.memberId());
        return new CoordinatorResult(records, response);
    }

    private List<CoordinatorRecord> removeStreamsMember(String groupId, String memberId) {
        return List.of(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId));
    }

    private Set<String> maybeDeleteResolvedRegularExpressions(List<CoordinatorRecord> records, ConsumerGroup group, Set<ConsumerGroupMember> members) {
        HashMap<String, Integer> counts = new HashMap<String, Integer>();
        members.forEach(member -> {
            if (GroupMetadataManager.isNotEmpty(member.subscribedTopicRegex())) {
                counts.compute(member.subscribedTopicRegex(), Utils::incValue);
            }
        });
        HashSet<String> deletedRegexes = new HashSet<String>();
        counts.forEach((regex, count) -> {
            if (group.numSubscribedMembers((String)regex) == count.intValue()) {
                records.add(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(group.groupId(), regex));
                deletedRegexes.add((String)regex);
            }
        });
        return deletedRegexes;
    }

    private void removeMember(List<CoordinatorRecord> records, String groupId, String memberId) {
        records.add(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId, memberId));
        records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId, memberId));
        records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId, memberId));
    }

    private void cancelTimers(String groupId, String memberId) {
        this.cancelGroupSessionTimeout(groupId, memberId);
        this.cancelGroupRebalanceTimeout(groupId, memberId);
        this.cancelConsumerGroupJoinTimeout(groupId, memberId);
        this.cancelConsumerGroupSyncTimeout(groupId, memberId);
    }

    private void scheduleStreamsGroupSessionTimeout(String groupId, String memberId) {
        this.timer.schedule(GroupMetadataManager.groupSessionTimeoutKey(groupId, memberId), (long)this.streamsGroupSessionTimeoutMs(groupId), TimeUnit.MILLISECONDS, true, () -> this.streamsGroupFenceMemberOperation(groupId, memberId, "the member session expired."));
    }

    private void scheduleConsumerGroupSessionTimeout(String groupId, String memberId) {
        this.scheduleConsumerGroupSessionTimeout(groupId, memberId, this.consumerGroupSessionTimeoutMs(groupId));
    }

    private void scheduleShareGroupSessionTimeout(String groupId, String memberId) {
        this.scheduleShareGroupSessionTimeout(groupId, memberId, this.shareGroupSessionTimeoutMs(groupId));
    }

    private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupFenceMemberOperation(String groupId, String memberId, String reason) {
        try {
            ConsumerGroup group = this.consumerGroup(groupId);
            ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);
            this.log.info("[GroupId {}] Member {} fenced from the group because {}.", new Object[]{groupId, memberId, reason});
            return this.consumerGroupFenceMember(group, member, null);
        }
        catch (GroupIdNotFoundException ex) {
            this.log.debug("[GroupId {}] Could not fence {} because the group does not exist.", (Object)groupId, (Object)memberId);
        }
        catch (UnknownMemberIdException ex) {
            this.log.debug("[GroupId {}] Could not fence {} because the member does not exist.", (Object)groupId, (Object)memberId);
        }
        return new CoordinatorResult(List.of());
    }

    private <T> CoordinatorResult<T, CoordinatorRecord> shareGroupFenceMemberOperation(String groupId, String memberId, String reason) {
        try {
            ShareGroup group = this.shareGroup(groupId);
            ShareGroupMember member = group.getOrMaybeCreateMember(memberId, false);
            this.log.info("[GroupId {}] Member {} fenced from the group because {}.", new Object[]{groupId, memberId, reason});
            return this.shareGroupFenceMember(group, member, null);
        }
        catch (GroupIdNotFoundException ex) {
            this.log.debug("[GroupId {}] Could not fence {} because the group does not exist.", (Object)groupId, (Object)memberId);
        }
        catch (UnknownMemberIdException ex) {
            this.log.debug("[GroupId {}] Could not fence {} because the member does not exist.", (Object)groupId, (Object)memberId);
        }
        return new CoordinatorResult(List.of());
    }

    private <T> CoordinatorResult<T, CoordinatorRecord> streamsGroupFenceMemberOperation(String groupId, String memberId, String reason) {
        try {
            StreamsGroup group = this.streamsGroup(groupId);
            StreamsGroupMember member = group.getMemberOrThrow(memberId);
            this.log.info("[GroupId {}] Streams member {} fenced from the group because {}.", new Object[]{groupId, memberId, reason});
            return this.streamsGroupFenceMember(group, member, null);
        }
        catch (GroupIdNotFoundException ex) {
            this.log.debug("[GroupId {}] Could not fence streams member {} because the group does not exist.", (Object)groupId, (Object)memberId);
        }
        catch (UnknownMemberIdException ex) {
            this.log.debug("[GroupId {}] Could not fence streams member {} because the member does not exist.", (Object)groupId, (Object)memberId);
        }
        return new CoordinatorResult(List.of());
    }

    private void scheduleConsumerGroupSessionTimeout(String groupId, String memberId, int sessionTimeoutMs) {
        this.timer.schedule(GroupMetadataManager.groupSessionTimeoutKey(groupId, memberId), (long)sessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> this.consumerGroupFenceMemberOperation(groupId, memberId, "the member session expired"));
    }

    private void scheduleShareGroupSessionTimeout(String groupId, String memberId, int sessionTimeoutMs) {
        this.timer.schedule(GroupMetadataManager.groupSessionTimeoutKey(groupId, memberId), (long)sessionTimeoutMs, TimeUnit.MILLISECONDS, true, () -> this.shareGroupFenceMemberOperation(groupId, memberId, "the member session expired"));
    }

    private void cancelGroupSessionTimeout(String groupId, String memberId) {
        this.timer.cancel(GroupMetadataManager.groupSessionTimeoutKey(groupId, memberId));
    }

    private void scheduleConsumerGroupRebalanceTimeout(String groupId, String memberId, int memberEpoch, int rebalanceTimeoutMs) {
        String key = GroupMetadataManager.groupRebalanceTimeoutKey(groupId, memberId);
        this.timer.schedule(key, (long)rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> {
            try {
                ConsumerGroup group = this.consumerGroup(groupId);
                ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false);
                if (member.memberEpoch() == memberEpoch) {
                    this.log.info("[GroupId {}] Member {} fenced from the group because it failed to transition from epoch {} within {}ms.", new Object[]{groupId, memberId, memberEpoch, rebalanceTimeoutMs});
                    return this.consumerGroupFenceMember(group, member, null);
                }
                this.log.debug("[GroupId {}] Ignoring rebalance timeout for {} because the member left the epoch {}.", new Object[]{groupId, memberId, memberEpoch});
                return new CoordinatorResult(List.of());
            }
            catch (GroupIdNotFoundException ex) {
                this.log.debug("[GroupId {}] Could not fence {}} because the group does not exist.", (Object)groupId, (Object)memberId);
            }
            catch (UnknownMemberIdException ex) {
                this.log.debug("[GroupId {}] Could not fence {} because the member does not exist.", (Object)groupId, (Object)memberId);
            }
            return new CoordinatorResult(List.of());
        });
    }

    private void scheduleStreamsGroupRebalanceTimeout(String groupId, String memberId, int memberEpoch, int rebalanceTimeoutMs) {
        String key = GroupMetadataManager.groupRebalanceTimeoutKey(groupId, memberId);
        this.timer.schedule(key, (long)rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> {
            try {
                StreamsGroup group = this.streamsGroup(groupId);
                StreamsGroupMember member = group.getMemberOrThrow(memberId);
                if (member.memberEpoch() == memberEpoch) {
                    this.log.info("[GroupId {}] Member {} fenced from the group because it failed to transition from epoch {} within {}ms.", new Object[]{groupId, memberId, memberEpoch, rebalanceTimeoutMs});
                    return this.streamsGroupFenceMember(group, member, null);
                }
                this.log.debug("[GroupId {}] Ignoring rebalance timeout for {} because the member is not in epoch {} anymore.", new Object[]{groupId, memberId, memberEpoch});
                return new CoordinatorResult(List.of());
            }
            catch (GroupIdNotFoundException ex) {
                this.log.debug("[GroupId {}] Could not fence {}} because the group does not exist.", (Object)groupId, (Object)memberId);
            }
            catch (UnknownMemberIdException ex) {
                this.log.debug("[GroupId {}] Could not fence {} because the member does not exist.", (Object)groupId, (Object)memberId);
            }
            return new CoordinatorResult(List.of());
        });
    }

    private void cancelGroupRebalanceTimeout(String groupId, String memberId) {
        this.timer.cancel(GroupMetadataManager.groupRebalanceTimeoutKey(groupId, memberId));
    }

    private void scheduleConsumerGroupJoinTimeoutIfAbsent(String groupId, String memberId, int rebalanceTimeoutMs) {
        this.timer.scheduleIfAbsent(GroupMetadataManager.consumerGroupJoinKey(groupId, memberId), (long)rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> this.consumerGroupFenceMemberOperation(groupId, memberId, "the classic member failed to join within the rebalance timeout"));
    }

    private void cancelConsumerGroupJoinTimeout(String groupId, String memberId) {
        this.timer.cancel(GroupMetadataManager.consumerGroupJoinKey(groupId, memberId));
    }

    private void scheduleConsumerGroupSyncTimeout(String groupId, String memberId, int rebalanceTimeoutMs) {
        this.timer.schedule(GroupMetadataManager.consumerGroupSyncKey(groupId, memberId), (long)rebalanceTimeoutMs, TimeUnit.MILLISECONDS, true, () -> this.consumerGroupFenceMemberOperation(groupId, memberId, "the member failed to sync within timeout"));
    }

    private void cancelConsumerGroupSyncTimeout(String groupId, String memberId) {
        this.timer.cancel(GroupMetadataManager.consumerGroupSyncKey(groupId, memberId));
    }

    public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(AuthorizableRequestContext context, ConsumerGroupHeartbeatRequestData request) throws ApiException {
        if (request.memberEpoch() == -1 || request.memberEpoch() == -2) {
            return this.consumerGroupLeave(request.groupId(), request.instanceId(), request.memberId(), request.memberEpoch());
        }
        return this.consumerGroupHeartbeat(context, request.groupId(), request.memberId(), request.memberEpoch(), request.instanceId(), request.rackId(), request.rebalanceTimeoutMs(), request.subscribedTopicNames(), request.subscribedTopicRegex(), request.serverAssignor(), request.topicPartitions());
    }

    public CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> streamsGroupHeartbeat(AuthorizableRequestContext context, StreamsGroupHeartbeatRequestData request) throws ApiException {
        if (request.memberEpoch() == -1 || request.memberEpoch() == -2) {
            return this.streamsGroupLeave(request.groupId(), request.instanceId(), request.memberId(), request.memberEpoch(), request.shutdownApplication());
        }
        return this.streamsGroupHeartbeat(request.groupId(), request.memberId(), request.memberEpoch(), request.instanceId(), request.rackId(), request.rebalanceTimeoutMs(), context.clientId(), context.clientAddress().toString(), request.topology(), request.activeTasks(), request.standbyTasks(), request.warmupTasks(), request.processId(), request.userEndpoint(), request.clientTags(), request.shutdownApplication(), request.endpointInformationEpoch());
    }

    public void replay(StreamsGroupTopologyKey key, StreamsGroupTopologyValue value) {
        StreamsGroup streamsGroup;
        String groupId = key.groupId();
        try {
            streamsGroup = this.getOrMaybeCreatePersistedStreamsGroup(groupId, value != null);
        }
        catch (GroupIdNotFoundException ex) {
            return;
        }
        Set<String> oldSubscribedTopicNames = streamsGroup.topology().isPresent() ? streamsGroup.topology().get().requiredTopics() : Set.of();
        if (value != null) {
            StreamsTopology topology = StreamsTopology.fromRecord(value);
            streamsGroup.setTopology(topology);
            Set<String> newSubscribedTopicNames = topology.requiredTopics();
            this.updateGroupsByTopics(groupId, oldSubscribedTopicNames, newSubscribedTopicNames);
        } else {
            this.updateGroupsByTopics(groupId, oldSubscribedTopicNames, Set.of());
            streamsGroup.setTopology(null);
        }
    }

    public CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<InitializeShareGroupStateParameters>>, CoordinatorRecord> shareGroupHeartbeat(AuthorizableRequestContext context, ShareGroupHeartbeatRequestData request) throws ApiException {
        if (request.memberEpoch() == -1) {
            CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> result = this.shareGroupLeave(request.groupId(), request.memberId(), request.memberEpoch());
            return new CoordinatorResult(result.records(), Map.entry((ShareGroupHeartbeatResponseData)result.response(), Optional.empty()));
        }
        return this.shareGroupHeartbeat(request.groupId(), request.memberId(), request.memberEpoch(), request.rackId(), context.clientId(), context.clientAddress().toString(), request.subscribedTopicNames());
    }

    public CoordinatorResult<Void, CoordinatorRecord> initializeShareGroupState(String groupId, Map<Uuid, Set<Integer>> topicPartitionMap) {
        if (topicPartitionMap == null || topicPartitionMap.isEmpty()) {
            return new CoordinatorResult(List.of(), null);
        }
        ShareGroup group = (ShareGroup)this.groups.get((Object)groupId);
        ShareGroup.ShareGroupStatePartitionMetadataInfo currentMap = (ShareGroup.ShareGroupStatePartitionMetadataInfo)this.shareGroupStatePartitionMetadata.get((Object)groupId);
        Map<Uuid, ShareGroup.InitMapValue> enrichedTopicPartitionMap = this.attachInitValue(topicPartitionMap);
        if (currentMap == null) {
            return new CoordinatorResult(List.of(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(group.groupId(), Map.of(), enrichedTopicPartitionMap, Map.of())), null);
        }
        Map<Uuid, ShareGroup.InitMapValue> finalInitializedMap = GroupMetadataManager.combineInitMaps(currentMap.initializedTopics(), enrichedTopicPartitionMap);
        HashMap<Uuid, ShareGroup.InitMapValue> finalInitializingMap = new HashMap<Uuid, ShareGroup.InitMapValue>();
        currentMap.initializingTopics().forEach((k, v) -> finalInitializingMap.put((Uuid)k, new ShareGroup.InitMapValue(v.name(), new HashSet<Integer>(v.partitions()), v.timestamp())));
        for (Map.Entry<Uuid, Set<Integer>> entry : topicPartitionMap.entrySet()) {
            Uuid topicId = entry.getKey();
            if (!finalInitializingMap.containsKey(topicId)) continue;
            Set<Integer> partitions = ((ShareGroup.InitMapValue)finalInitializingMap.get(topicId)).partitions();
            partitions.removeAll((Collection)entry.getValue());
            if (!partitions.isEmpty()) continue;
            finalInitializingMap.remove(topicId);
        }
        return new CoordinatorResult(List.of(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(group.groupId(), finalInitializingMap, finalInitializedMap, this.attachTopicName(currentMap.deletingTopics()))), null);
    }

    public CoordinatorResult<Void, CoordinatorRecord> uninitializeShareGroupState(String groupId, Map<Uuid, Set<Integer>> topicPartitionMap) {
        ShareGroup.ShareGroupStatePartitionMetadataInfo info = (ShareGroup.ShareGroupStatePartitionMetadataInfo)this.shareGroupStatePartitionMetadata.get((Object)groupId);
        if (info == null || info.initializingTopics().isEmpty() || topicPartitionMap.isEmpty()) {
            return new CoordinatorResult(List.of(), null);
        }
        Map<Uuid, ShareGroup.InitMapValue> initializingTopics = info.initializingTopics();
        HashMap<Uuid, ShareGroup.InitMapValue> finalInitializingTopics = new HashMap<Uuid, ShareGroup.InitMapValue>();
        for (Map.Entry<Uuid, ShareGroup.InitMapValue> entry : initializingTopics.entrySet()) {
            Uuid topicId = entry.getKey();
            if (!topicPartitionMap.containsKey(topicId)) {
                finalInitializingTopics.put(entry.getKey(), entry.getValue());
                continue;
            }
            HashSet<Integer> partitions = new HashSet<Integer>(entry.getValue().partitions());
            partitions.removeAll((Collection)topicPartitionMap.get(topicId));
            if (partitions.isEmpty()) continue;
            finalInitializingTopics.put(entry.getKey(), new ShareGroup.InitMapValue(entry.getValue().name(), partitions, entry.getValue().timestamp()));
        }
        return new CoordinatorResult(List.of(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(groupId, finalInitializingTopics, info.initializedTopics(), this.attachTopicName(info.deletingTopics()))), null);
    }

    private Map<Uuid, String> attachTopicName(Set<Uuid> topicIds) {
        TopicsImage topicsImage = this.metadataImage.topics();
        HashMap<Uuid, String> finalMap = new HashMap<Uuid, String>();
        for (Uuid topicId : topicIds) {
            TopicImage topicImage = topicsImage.getTopic(topicId);
            String topicName = topicImage != null ? topicImage.name() : "<UNKNOWN>";
            finalMap.put(topicId, topicName);
        }
        return Collections.unmodifiableMap(finalMap);
    }

    private Map<Uuid, ShareGroup.InitMapValue> attachInitValue(Map<Uuid, Set<Integer>> initMap) {
        TopicsImage topicsImage = this.metadataImage.topics();
        HashMap<Uuid, ShareGroup.InitMapValue> finalMap = new HashMap<Uuid, ShareGroup.InitMapValue>();
        long timestamp = this.time.milliseconds();
        for (Map.Entry<Uuid, Set<Integer>> entry : initMap.entrySet()) {
            Uuid topicId = entry.getKey();
            TopicImage topicImage = topicsImage.getTopic(topicId);
            String topicName = topicImage != null ? topicImage.name() : "<UNKNOWN>";
            finalMap.put(topicId, new ShareGroup.InitMapValue(topicName, entry.getValue(), timestamp));
        }
        return Collections.unmodifiableMap(finalMap);
    }

    public Map<Uuid, Set<Integer>> initializedShareGroupPartitions(String groupId) {
        HashMap<Uuid, Set<Integer>> resultMap = new HashMap<Uuid, Set<Integer>>();
        ShareGroup.ShareGroupStatePartitionMetadataInfo currentMap = (ShareGroup.ShareGroupStatePartitionMetadataInfo)this.shareGroupStatePartitionMetadata.get((Object)groupId);
        if (currentMap != null) {
            currentMap.initializedTopics().forEach((topicId, initMapValue) -> resultMap.put((Uuid)topicId, (Set<Integer>)new HashSet<Integer>(initMapValue.partitions())));
        }
        return resultMap;
    }

    public void replay(ConsumerGroupMemberMetadataKey key, ConsumerGroupMemberMetadataValue value) {
        ConsumerGroup consumerGroup;
        String groupId = key.groupId();
        String memberId = key.memberId();
        try {
            consumerGroup = this.getOrMaybeCreatePersistedConsumerGroup(groupId, value != null);
        }
        catch (GroupIdNotFoundException ex) {
            return;
        }
        HashSet<String> oldSubscribedTopicNames = new HashSet<String>(consumerGroup.subscribedTopicNames().keySet());
        if (value != null) {
            ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, true);
            consumerGroup.updateMember(new ConsumerGroupMember.Builder(oldMember).updateWith(value).build());
        } else {
            ConsumerGroupMember oldMember;
            try {
                oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false);
            }
            catch (UnknownMemberIdException ex) {
                return;
            }
            if (oldMember.memberEpoch() != -1) {
                throw new IllegalStateException("Received a tombstone record to delete member " + memberId + " but did not receive ConsumerGroupCurrentMemberAssignmentValue tombstone.");
            }
            if (consumerGroup.targetAssignment().containsKey(memberId)) {
                throw new IllegalStateException("Received a tombstone record to delete member " + memberId + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone.");
            }
            consumerGroup.removeMember(memberId);
        }
        this.updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames().keySet());
    }

    public Set<String> groupsSubscribedToTopic(String topicName) {
        Set<String> groups = (Set<String>)this.groupsByTopics.get((Object)topicName);
        return groups != null ? groups : Set.of();
    }

    private void subscribeGroupToTopic(String groupId, String topicName) {
        ((TimelineHashSet)this.groupsByTopics.computeIfAbsent((Object)topicName, __ -> new TimelineHashSet(this.snapshotRegistry, 1))).add((Object)groupId);
    }

    private void unsubscribeGroupFromTopic(String groupId, String topicName) {
        this.groupsByTopics.computeIfPresent((Object)topicName, (__, groupIds) -> {
            groupIds.remove((Object)groupId);
            if (groupIds.isEmpty()) {
                this.topicHashCache.remove(topicName);
                return null;
            }
            return groupIds;
        });
    }

    private void updateGroupsByTopics(String groupId, Set<String> oldSubscribedTopics, Set<String> newSubscribedTopics) {
        if (oldSubscribedTopics.isEmpty()) {
            newSubscribedTopics.forEach(topicName -> this.subscribeGroupToTopic(groupId, (String)topicName));
        } else if (newSubscribedTopics.isEmpty()) {
            oldSubscribedTopics.forEach(topicName -> this.unsubscribeGroupFromTopic(groupId, (String)topicName));
        } else {
            oldSubscribedTopics.forEach(topicName -> {
                if (!newSubscribedTopics.contains(topicName)) {
                    this.unsubscribeGroupFromTopic(groupId, (String)topicName);
                }
            });
            newSubscribedTopics.forEach(topicName -> {
                if (!oldSubscribedTopics.contains(topicName)) {
                    this.subscribeGroupToTopic(groupId, (String)topicName);
                }
            });
        }
    }

    public void replay(ConsumerGroupMetadataKey key, ConsumerGroupMetadataValue value) {
        String groupId = key.groupId();
        if (value != null) {
            ConsumerGroup consumerGroup = this.getOrMaybeCreatePersistedConsumerGroup(groupId, true);
            consumerGroup.setGroupEpoch(value.epoch());
            consumerGroup.setMetadataHash(value.metadataHash());
        } else {
            ConsumerGroup consumerGroup;
            try {
                consumerGroup = this.getOrMaybeCreatePersistedConsumerGroup(groupId, false);
            }
            catch (GroupIdNotFoundException ex) {
                return;
            }
            if (!consumerGroup.members().isEmpty()) {
                throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the group still has " + consumerGroup.members().size() + " members.");
            }
            if (!consumerGroup.targetAssignment().isEmpty()) {
                throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the target assignment still has " + consumerGroup.targetAssignment().size() + " members.");
            }
            if (consumerGroup.assignmentEpoch() != -1) {
                throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but did not receive ConsumerGroupTargetAssignmentMetadataValue tombstone.");
            }
            this.removeGroup(groupId);
        }
    }

    public void replay(ConsumerGroupPartitionMetadataKey key, ConsumerGroupPartitionMetadataValue value) {
        ConsumerGroup group;
        String groupId = key.groupId();
        try {
            group = this.getOrMaybeCreatePersistedConsumerGroup(groupId, value != null);
        }
        catch (GroupIdNotFoundException ex) {
            return;
        }
        group.setHasSubscriptionMetadataRecord(value != null);
    }

    public void replay(ConsumerGroupTargetAssignmentMemberKey key, ConsumerGroupTargetAssignmentMemberValue value) {
        String groupId = key.groupId();
        String memberId = key.memberId();
        if (value != null) {
            ConsumerGroup group = this.getOrMaybeCreatePersistedConsumerGroup(groupId, true);
            group.updateTargetAssignment(memberId, Assignment.fromRecord(value));
        } else {
            ConsumerGroup group;
            try {
                group = this.getOrMaybeCreatePersistedConsumerGroup(groupId, false);
            }
            catch (GroupIdNotFoundException ex) {
                return;
            }
            group.removeTargetAssignment(memberId);
        }
    }

    public void replay(ConsumerGroupTargetAssignmentMetadataKey key, ConsumerGroupTargetAssignmentMetadataValue value) {
        String groupId = key.groupId();
        if (value != null) {
            ConsumerGroup group = this.getOrMaybeCreatePersistedConsumerGroup(groupId, true);
            group.setTargetAssignmentEpoch(value.assignmentEpoch());
        } else {
            ConsumerGroup group;
            try {
                group = this.getOrMaybeCreatePersistedConsumerGroup(groupId, false);
            }
            catch (GroupIdNotFoundException ex) {
                return;
            }
            if (!group.targetAssignment().isEmpty()) {
                throw new IllegalStateException("Received a tombstone record to delete target assignment of " + groupId + " but the assignment still has " + group.targetAssignment().size() + " members.");
            }
            group.setTargetAssignmentEpoch(-1);
        }
    }

    public void replay(ConsumerGroupCurrentMemberAssignmentKey key, ConsumerGroupCurrentMemberAssignmentValue value) {
        String groupId = key.groupId();
        String memberId = key.memberId();
        if (value != null) {
            ConsumerGroup group = this.getOrMaybeCreatePersistedConsumerGroup(groupId, true);
            ConsumerGroupMember oldMember = group.getOrMaybeCreateMember(memberId, true);
            ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(oldMember).updateWith(value).build();
            group.updateMember(newMember);
        } else {
            ConsumerGroupMember oldMember;
            ConsumerGroup group;
            try {
                group = this.getOrMaybeCreatePersistedConsumerGroup(groupId, false);
            }
            catch (GroupIdNotFoundException ex) {
                return;
            }
            try {
                oldMember = group.getOrMaybeCreateMember(memberId, false);
            }
            catch (UnknownMemberIdException ex) {
                return;
            }
            ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(oldMember).setMemberEpoch(-1).setPreviousMemberEpoch(-1).setAssignedPartitions(Map.of()).setPartitionsPendingRevocation(Map.of()).build();
            group.updateMember(newMember);
        }
    }

    public void replay(ConsumerGroupRegularExpressionKey key, ConsumerGroupRegularExpressionValue value) {
        ConsumerGroup consumerGroup;
        String groupId = key.groupId();
        String regex = key.regularExpression();
        try {
            consumerGroup = this.getOrMaybeCreatePersistedConsumerGroup(groupId, value != null);
        }
        catch (GroupIdNotFoundException ex) {
            return;
        }
        HashSet<String> oldSubscribedTopicNames = new HashSet<String>(consumerGroup.subscribedTopicNames().keySet());
        if (value != null) {
            consumerGroup.updateResolvedRegularExpression(regex, new ResolvedRegularExpression(new HashSet<String>(value.topics()), value.version(), value.timestamp()));
        } else {
            consumerGroup.removeResolvedRegularExpression(regex);
        }
        this.updateGroupsByTopics(groupId, oldSubscribedTopicNames, consumerGroup.subscribedTopicNames().keySet());
    }

    public void replay(StreamsGroupMetadataKey key, StreamsGroupMetadataValue value) {
        String groupId = key.groupId();
        if (value != null) {
            StreamsGroup streamsGroup = this.getOrMaybeCreatePersistedStreamsGroup(groupId, true);
            streamsGroup.setGroupEpoch(value.epoch());
            streamsGroup.setMetadataHash(value.metadataHash());
        } else {
            StreamsGroup streamsGroup;
            try {
                streamsGroup = this.getOrMaybeCreatePersistedStreamsGroup(groupId, false);
            }
            catch (GroupIdNotFoundException ex) {
                return;
            }
            if (!streamsGroup.members().isEmpty()) {
                throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the group still has " + streamsGroup.members().size() + " members.");
            }
            if (streamsGroup.assignmentEpoch() != -1) {
                throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but did not receive StreamsGroupTargetAssignmentMetadataValue tombstone.");
            }
            this.removeGroup(groupId);
        }
    }

    public void replay(ShareGroupMemberMetadataKey key, ShareGroupMemberMetadataValue value) {
        ShareGroupMember oldMember;
        ShareGroup shareGroup;
        String groupId = key.groupId();
        String memberId = key.memberId();
        try {
            shareGroup = this.getOrMaybeCreatePersistedShareGroup(groupId, value != null);
            oldMember = shareGroup.getOrMaybeCreateMember(memberId, value != null);
        }
        catch (GroupIdNotFoundException ex) {
            this.log.debug("ShareGroupMemberMetadata tombstone without group - {}", (Object)groupId, (Object)ex);
            return;
        }
        catch (UnknownMemberIdException ex) {
            this.log.debug("ShareGroupMemberMetadata tombstone for groupId - {} without member - {}", new Object[]{groupId, memberId, ex});
            return;
        }
        HashSet<String> oldSubscribedTopicNames = new HashSet<String>(shareGroup.subscribedTopicNames().keySet());
        if (value != null) {
            shareGroup.updateMember(new ShareGroupMember.Builder(oldMember).updateWith(value).build());
        } else {
            if (oldMember.memberEpoch() != -1) {
                throw new IllegalStateException("Received a tombstone record to delete member " + memberId + " with invalid leave group epoch.");
            }
            if (shareGroup.targetAssignment().containsKey(memberId)) {
                throw new IllegalStateException("Received a tombstone record to delete member " + memberId + " but member exists in target assignment.");
            }
            shareGroup.removeMember(memberId);
        }
        this.updateGroupsByTopics(groupId, oldSubscribedTopicNames, shareGroup.subscribedTopicNames().keySet());
    }

    public void replay(ShareGroupMetadataKey key, ShareGroupMetadataValue value) {
        ShareGroup shareGroup;
        String groupId = key.groupId();
        try {
            shareGroup = this.getOrMaybeCreatePersistedShareGroup(groupId, value != null);
        }
        catch (GroupIdNotFoundException ex) {
            this.log.debug("ShareGroupMetadata tombstone without group - {}", (Object)groupId, (Object)ex);
            return;
        }
        if (value != null) {
            shareGroup.setGroupEpoch(value.epoch());
            shareGroup.setMetadataHash(value.metadataHash());
        } else {
            if (!shareGroup.members().isEmpty()) {
                throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the group still has " + shareGroup.members().size() + " members.");
            }
            if (!shareGroup.targetAssignment().isEmpty()) {
                throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but the target assignment still has " + shareGroup.targetAssignment().size() + " members.");
            }
            if (shareGroup.assignmentEpoch() != -1) {
                throw new IllegalStateException("Received a tombstone record to delete group " + groupId + " but target assignment epoch in invalid.");
            }
            this.removeGroup(groupId);
        }
    }

    public void replay(StreamsGroupMemberMetadataKey key, StreamsGroupMemberMetadataValue value) {
        StreamsGroup streamsGroup;
        String groupId = key.groupId();
        String memberId = key.memberId();
        try {
            streamsGroup = this.getOrMaybeCreatePersistedStreamsGroup(groupId, value != null);
        }
        catch (GroupIdNotFoundException ex) {
            return;
        }
        if (value != null) {
            StreamsGroupMember oldMember = streamsGroup.getOrCreateUninitializedMember(memberId);
            streamsGroup.updateMember(new StreamsGroupMember.Builder(oldMember).updateWith(value).build());
        } else {
            StreamsGroupMember oldMember;
            try {
                oldMember = streamsGroup.getMemberOrThrow(memberId);
            }
            catch (UnknownMemberIdException ex) {
                return;
            }
            if (oldMember.memberEpoch() != -1) {
                throw new IllegalStateException("Received a tombstone record to delete member " + memberId + " but did not receive StreamsGroupCurrentMemberAssignmentValue tombstone.");
            }
            if (streamsGroup.targetAssignment().containsKey(memberId)) {
                throw new IllegalStateException("Received a tombstone record to delete member " + memberId + " but did not receive StreamsGroupTargetAssignmentMetadataValue tombstone.");
            }
            streamsGroup.removeMember(memberId);
        }
    }

    public void replay(StreamsGroupTargetAssignmentMetadataKey key, StreamsGroupTargetAssignmentMetadataValue value) {
        String groupId = key.groupId();
        if (value != null) {
            StreamsGroup streamsGroup = this.getOrMaybeCreatePersistedStreamsGroup(groupId, true);
            streamsGroup.setTargetAssignmentEpoch(value.assignmentEpoch());
        } else {
            StreamsGroup streamsGroup;
            try {
                streamsGroup = this.getOrMaybeCreatePersistedStreamsGroup(groupId, false);
            }
            catch (GroupIdNotFoundException ex) {
                return;
            }
            if (!streamsGroup.targetAssignment().isEmpty()) {
                throw new IllegalStateException("Received a tombstone record to delete target assignment of " + groupId + " but the assignment still has " + streamsGroup.targetAssignment().size() + " members.");
            }
            streamsGroup.setTargetAssignmentEpoch(-1);
        }
    }

    public void replay(StreamsGroupTargetAssignmentMemberKey key, StreamsGroupTargetAssignmentMemberValue value) {
        String groupId = key.groupId();
        String memberId = key.memberId();
        if (value != null) {
            StreamsGroup streamsGroup = this.getOrMaybeCreatePersistedStreamsGroup(groupId, true);
            streamsGroup.updateTargetAssignment(memberId, TasksTuple.fromTargetAssignmentRecord(value));
        } else {
            StreamsGroup streamsGroup;
            try {
                streamsGroup = this.getOrMaybeCreatePersistedStreamsGroup(groupId, false);
            }
            catch (GroupIdNotFoundException ex) {
                return;
            }
            streamsGroup.removeTargetAssignment(memberId);
        }
    }

    public void replay(StreamsGroupCurrentMemberAssignmentKey key, StreamsGroupCurrentMemberAssignmentValue value) {
        String groupId = key.groupId();
        String memberId = key.memberId();
        if (value != null) {
            StreamsGroup streamsGroup = this.getOrMaybeCreatePersistedStreamsGroup(groupId, true);
            StreamsGroupMember oldMember = streamsGroup.getOrCreateUninitializedMember(memberId);
            StreamsGroupMember newMember = new StreamsGroupMember.Builder(oldMember).updateWith(value).build();
            streamsGroup.updateMember(newMember);
        } else {
            StreamsGroupMember oldMember;
            StreamsGroup streamsGroup;
            try {
                streamsGroup = this.getOrMaybeCreatePersistedStreamsGroup(groupId, false);
            }
            catch (GroupIdNotFoundException ex) {
                return;
            }
            try {
                oldMember = streamsGroup.getMemberOrThrow(memberId);
            }
            catch (UnknownMemberIdException ex) {
                return;
            }
            StreamsGroupMember newMember = new StreamsGroupMember.Builder(oldMember).setMemberEpoch(-1).setPreviousMemberEpoch(-1).setAssignedTasks(TasksTuple.EMPTY).setTasksPendingRevocation(TasksTuple.EMPTY).build();
            streamsGroup.updateMember(newMember);
        }
    }

    public void replay(ShareGroupTargetAssignmentMemberKey key, ShareGroupTargetAssignmentMemberValue value) {
        ShareGroup group;
        String groupId = key.groupId();
        String memberId = key.memberId();
        try {
            group = this.getOrMaybeCreatePersistedShareGroup(groupId, value != null);
        }
        catch (GroupIdNotFoundException ex) {
            this.log.debug("ShareGroupTargetAssignmentMember tombstone without group - {}", (Object)groupId, (Object)ex);
            return;
        }
        if (value != null) {
            group.updateTargetAssignment(memberId, Assignment.fromRecord(value));
        } else {
            group.removeTargetAssignment(memberId);
        }
    }

    public void replay(ShareGroupTargetAssignmentMetadataKey key, ShareGroupTargetAssignmentMetadataValue value) {
        ShareGroup group;
        String groupId = key.groupId();
        try {
            group = this.getOrMaybeCreatePersistedShareGroup(groupId, value != null);
        }
        catch (GroupIdNotFoundException ex) {
            this.log.debug("ShareGroupTargetAssignmentMetadata tombstone without group - {}", (Object)groupId, (Object)ex);
            return;
        }
        if (value != null) {
            group.setTargetAssignmentEpoch(value.assignmentEpoch());
        } else {
            if (!group.targetAssignment().isEmpty()) {
                throw new IllegalStateException("Received a tombstone record to delete target assignment of " + groupId + " but the assignment still has " + group.targetAssignment().size() + " members.");
            }
            group.setTargetAssignmentEpoch(-1);
        }
    }

    public void replay(ShareGroupCurrentMemberAssignmentKey key, ShareGroupCurrentMemberAssignmentValue value) {
        ShareGroupMember oldMember;
        ShareGroup group;
        String groupId = key.groupId();
        String memberId = key.memberId();
        try {
            group = this.getOrMaybeCreatePersistedShareGroup(groupId, value != null);
            oldMember = group.getOrMaybeCreateMember(memberId, value != null);
        }
        catch (GroupIdNotFoundException ex) {
            this.log.debug("ShareGroupCurrentMemberAssignment tombstone without group - {}", (Object)groupId, (Object)ex);
            return;
        }
        catch (UnknownMemberIdException ex) {
            this.log.debug("ShareGroupCurrentMemberAssignment tombstone for groupId - {} without member - {}", new Object[]{groupId, memberId, ex});
            return;
        }
        if (value != null) {
            ShareGroupMember newMember = new ShareGroupMember.Builder(oldMember).updateWith(value).build();
            group.updateMember(newMember);
        } else {
            ShareGroupMember newMember = new ShareGroupMember.Builder(oldMember).setMemberEpoch(-1).setPreviousMemberEpoch(-1).setAssignedPartitions(Map.of()).build();
            group.updateMember(newMember);
        }
    }

    public void replay(ShareGroupStatePartitionMetadataKey key, ShareGroupStatePartitionMetadataValue value) {
        String groupId = key.groupId();
        try {
            this.getOrMaybeCreatePersistedShareGroup(groupId, value != null);
        }
        catch (GroupIdNotFoundException ex) {
            this.log.debug("ShareGroupStatePartitionMetadata tombstone for non-existent share group {}", (Object)groupId, (Object)ex);
        }
        if (value == null) {
            this.shareGroupStatePartitionMetadata.remove((Object)groupId);
        } else {
            long timestamp = this.time.milliseconds();
            ShareGroup.ShareGroupStatePartitionMetadataInfo info = new ShareGroup.ShareGroupStatePartitionMetadataInfo(value.initializingTopics().stream().map(topicPartitionInfo -> Map.entry(topicPartitionInfo.topicId(), new ShareGroup.InitMapValue(topicPartitionInfo.topicName(), new HashSet<Integer>(topicPartitionInfo.partitions()), timestamp))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), value.initializedTopics().stream().map(topicPartitionInfo -> Map.entry(topicPartitionInfo.topicId(), new ShareGroup.InitMapValue(topicPartitionInfo.topicName(), new HashSet<Integer>(topicPartitionInfo.partitions()), timestamp))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), value.deletingTopics().stream().map(ShareGroupStatePartitionMetadataValue.TopicInfo::topicId).collect(Collectors.toSet()));
            this.shareGroupStatePartitionMetadata.put((Object)groupId, (Object)info);
        }
    }

    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
        TopicsDelta topicsDelta;
        this.metadataImage = newImage;
        if (this.lastMetadataImageWithNewTopics == -1L) {
            this.lastMetadataImageWithNewTopics = this.metadataImage.provenance().lastContainedOffset();
        }
        if ((topicsDelta = delta.topicsDelta()) == null) {
            return;
        }
        if (!topicsDelta.createdTopicIds().isEmpty()) {
            this.lastMetadataImageWithNewTopics = this.metadataImage.provenance().lastContainedOffset();
        }
        HashSet allGroupIds = new HashSet();
        topicsDelta.changedTopics().forEach((topicId, topicDelta) -> {
            String topicName = topicDelta.name();
            this.topicHashCache.remove(topicName);
            allGroupIds.addAll(this.groupsSubscribedToTopic(topicName));
        });
        topicsDelta.deletedTopicIds().forEach(topicId -> {
            TopicImage topicImage = delta.image().topics().getTopic(topicId);
            String topicName = topicImage.name();
            this.topicHashCache.remove(topicName);
            allGroupIds.addAll(this.groupsSubscribedToTopic(topicName));
        });
        allGroupIds.forEach(groupId -> {
            Group group = (Group)this.groups.get(groupId);
            if (group != null) {
                group.requestMetadataRefresh();
            }
        });
    }

    public void updateGroupSizeCounter() {
        HashMap<ClassicGroupState, Long> classicGroupSizeCounter = new HashMap<ClassicGroupState, Long>();
        HashMap<ConsumerGroup.ConsumerGroupState, Long> consumerGroupSizeCounter = new HashMap<ConsumerGroup.ConsumerGroupState, Long>();
        HashMap<StreamsGroup.StreamsGroupState, Long> streamsGroupSizeCounter = new HashMap<StreamsGroup.StreamsGroupState, Long>();
        HashMap<ShareGroup.ShareGroupState, Long> shareGroupSizeCounter = new HashMap<ShareGroup.ShareGroupState, Long>();
        this.groups.forEach((__, group) -> {
            switch (group.type()) {
                case CLASSIC: {
                    classicGroupSizeCounter.compute(((ClassicGroup)group).currentState(), Utils::incValue);
                    break;
                }
                case CONSUMER: {
                    consumerGroupSizeCounter.compute(((ConsumerGroup)group).state(), Utils::incValue);
                    break;
                }
                case STREAMS: {
                    streamsGroupSizeCounter.compute(((StreamsGroup)group).state(), Utils::incValue);
                    break;
                }
                case SHARE: {
                    shareGroupSizeCounter.compute(((ShareGroup)group).state(), Utils::incValue);
                    break;
                }
            }
        });
        this.metrics.setClassicGroupGauges(classicGroupSizeCounter);
        this.metrics.setConsumerGroupGauges(consumerGroupSizeCounter);
        this.metrics.setStreamsGroupGauges(streamsGroupSizeCounter);
        this.metrics.setShareGroupGauges(shareGroupSizeCounter);
    }

    public void onLoaded() {
        HashMap<ClassicGroupState, Long> classicGroupSizeCounter = new HashMap<ClassicGroupState, Long>();
        this.groups.forEach((groupId, group) -> {
            switch (group.type()) {
                case STREAMS: {
                    StreamsGroup streamsGroup = (StreamsGroup)group;
                    this.log.info("Loaded streams group {} with {} members.", groupId, (Object)streamsGroup.members().size());
                    streamsGroup.members().forEach((memberId, member) -> {
                        this.log.debug("Loaded member {} in streams group {}.", memberId, groupId);
                        this.scheduleStreamsGroupSessionTimeout((String)groupId, (String)memberId);
                        if (member.state() == org.apache.kafka.coordinator.group.streams.MemberState.UNREVOKED_TASKS) {
                            this.scheduleStreamsGroupRebalanceTimeout((String)groupId, member.memberId(), member.memberEpoch(), member.rebalanceTimeoutMs());
                        }
                    });
                    break;
                }
                case CONSUMER: {
                    ConsumerGroup consumerGroup = (ConsumerGroup)group;
                    this.log.info("Loaded consumer group {} with {} members.", groupId, (Object)consumerGroup.members().size());
                    consumerGroup.members().forEach((memberId, member) -> {
                        this.log.debug("Loaded member {} in consumer group {}.", memberId, groupId);
                        this.scheduleConsumerGroupSessionTimeout((String)groupId, (String)memberId);
                        if (member.state() == MemberState.UNREVOKED_PARTITIONS) {
                            this.scheduleConsumerGroupRebalanceTimeout((String)groupId, member.memberId(), member.memberEpoch(), member.rebalanceTimeoutMs());
                        }
                    });
                    break;
                }
                case CLASSIC: {
                    ClassicGroup classicGroup = (ClassicGroup)group;
                    this.log.info("Loaded classic group {} with {} members.", groupId, (Object)classicGroup.allMembers().size());
                    classicGroup.allMembers().forEach(member -> {
                        this.log.debug("Loaded member {} in classic group {}.", (Object)member.memberId(), groupId);
                        this.rescheduleClassicGroupMemberHeartbeat(classicGroup, (ClassicGroupMember)member);
                    });
                    if (classicGroup.numMembers() > this.config.classicGroupMaxSize()) {
                        this.prepareRebalance(classicGroup, "Freshly-loaded group " + groupId + " (size " + classicGroup.numMembers() + ") is over capacity " + this.config.classicGroupMaxSize() + ". Rebalancing in order to give a chance for consumers to commit offsets");
                    }
                    classicGroupSizeCounter.compute(classicGroup.currentState(), Utils::incValue);
                    break;
                }
                case SHARE: {
                    ShareGroup shareGroup = (ShareGroup)group;
                    this.log.info("Loaded share group {} with {} members.", groupId, (Object)shareGroup.members().size());
                    shareGroup.members().forEach((memberId, member) -> {
                        this.log.debug("Loaded member {} in share group {}.", memberId, groupId);
                        this.scheduleShareGroupSessionTimeout((String)groupId, (String)memberId);
                    });
                    break;
                }
                default: {
                    this.log.warn("Loaded group {} with an unknown group type {}.", groupId, (Object)group.type());
                }
            }
        });
        this.metrics.setClassicGroupGauges(classicGroupSizeCounter);
    }

    public void onUnloaded() {
        this.groups.values().forEach(group -> {
            switch (group.type()) {
                case STREAMS: {
                    StreamsGroup streamsGroup = (StreamsGroup)group;
                    this.log.info("[GroupId={}] Unloaded group metadata for group epoch {}.", (Object)streamsGroup.groupId(), (Object)streamsGroup.groupEpoch());
                    break;
                }
                case CONSUMER: {
                    ConsumerGroup consumerGroup = (ConsumerGroup)group;
                    this.log.info("[GroupId={}] Unloaded group metadata for group epoch {}.", (Object)consumerGroup.groupId(), (Object)consumerGroup.groupEpoch());
                    break;
                }
                case CLASSIC: {
                    ClassicGroup classicGroup = (ClassicGroup)group;
                    this.log.info("[GroupId={}] Unloading group metadata for generation {}.", (Object)classicGroup.groupId(), (Object)classicGroup.generationId());
                    classicGroup.transitionTo(ClassicGroupState.DEAD);
                    switch (classicGroup.previousState()) {
                        case EMPTY: 
                        case DEAD: {
                            break;
                        }
                        case PREPARING_REBALANCE: {
                            classicGroup.allMembers().forEach(member -> classicGroup.completeJoinFuture((ClassicGroupMember)member, new JoinGroupResponseData().setMemberId(member.memberId()).setErrorCode(Errors.NOT_COORDINATOR.code())));
                            break;
                        }
                        case COMPLETING_REBALANCE: 
                        case STABLE: {
                            classicGroup.allMembers().forEach(member -> classicGroup.completeSyncFuture((ClassicGroupMember)member, new SyncGroupResponseData().setErrorCode(Errors.NOT_COORDINATOR.code())));
                        }
                    }
                    break;
                }
                case SHARE: {
                    ShareGroup shareGroup = (ShareGroup)group;
                    this.log.info("[GroupId={}] Unloaded group metadata for group epoch {}.", (Object)shareGroup.groupId(), (Object)shareGroup.groupEpoch());
                    break;
                }
                default: {
                    this.log.warn("onUnloaded group with an unknown group type {}.", (Object)group.type());
                }
            }
        });
    }

    public static String groupSessionTimeoutKey(String groupId, String memberId) {
        return "session-timeout-" + groupId + "-" + memberId;
    }

    public static String groupRebalanceTimeoutKey(String groupId, String memberId) {
        return "rebalance-timeout-" + groupId + "-" + memberId;
    }

    public void replay(GroupMetadataKey key, GroupMetadataValue value) {
        String groupId = key.group();
        if (value == null) {
            this.removeGroup(groupId);
        } else {
            ArrayList<ClassicGroupMember> loadedMembers = new ArrayList<ClassicGroupMember>();
            for (GroupMetadataValue.MemberMetadata member2 : value.members()) {
                int rebalanceTimeout = member2.rebalanceTimeout() == -1 ? member2.sessionTimeout() : member2.rebalanceTimeout();
                JoinGroupRequestData.JoinGroupRequestProtocolCollection supportedProtocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
                supportedProtocols.add((ImplicitLinkedHashCollection.Element)new JoinGroupRequestData.JoinGroupRequestProtocol().setName(value.protocol()).setMetadata(member2.subscription()));
                ClassicGroupMember loadedMember = new ClassicGroupMember(member2.memberId(), Optional.ofNullable(member2.groupInstanceId()), member2.clientId(), member2.clientHost(), rebalanceTimeout, member2.sessionTimeout(), value.protocolType(), supportedProtocols, member2.assignment());
                loadedMembers.add(loadedMember);
            }
            String protocolType = value.protocolType();
            ClassicGroup classicGroup = new ClassicGroup(this.logContext, groupId, loadedMembers.isEmpty() ? ClassicGroupState.EMPTY : ClassicGroupState.STABLE, this.time, value.generation(), protocolType == null || protocolType.isEmpty() ? Optional.empty() : Optional.of(protocolType), Optional.ofNullable(value.protocol()), Optional.ofNullable(value.leader()), value.currentStateTimestamp() == -1L ? Optional.empty() : Optional.of(value.currentStateTimestamp()));
            loadedMembers.forEach(member -> classicGroup.add((ClassicGroupMember)member, null));
            this.groups.put((Object)groupId, (Object)classicGroup);
            classicGroup.setSubscribedTopics(classicGroup.computeSubscribedTopics());
        }
    }

    public CoordinatorResult<Void, CoordinatorRecord> classicGroupJoin(AuthorizableRequestContext context, JoinGroupRequestData request, CompletableFuture<JoinGroupResponseData> responseFuture) {
        Group group = (Group)this.groups.get((Object)request.groupId(), Long.MAX_VALUE);
        if (group != null) {
            if (group.type() == Group.GroupType.CONSUMER && !group.isEmpty()) {
                return this.classicGroupJoinToConsumerGroup((ConsumerGroup)group, context, request, responseFuture);
            }
            if (group.type() == Group.GroupType.CONSUMER || group.type() == Group.GroupType.CLASSIC) {
                return this.classicGroupJoinToClassicGroup(context, request, responseFuture);
            }
            responseFuture.complete(new JoinGroupResponseData().setMemberId("").setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code()));
            return EMPTY_RESULT;
        }
        return this.classicGroupJoinToClassicGroup(context, request, responseFuture);
    }

    CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToClassicGroup(AuthorizableRequestContext context, JoinGroupRequestData request, CompletableFuture<JoinGroupResponseData> responseFuture) {
        ClassicGroup group;
        CoordinatorResult<Void, CoordinatorRecord> result = EMPTY_RESULT;
        ArrayList<CoordinatorRecord> records = new ArrayList<CoordinatorRecord>();
        String groupId = request.groupId();
        String memberId = request.memberId();
        boolean isUnknownMember = memberId.equals("");
        if (this.maybeDeleteEmptyConsumerGroup(groupId, records)) {
            this.log.info("[GroupId {}] Converted the empty consumer group to a classic group.", (Object)groupId);
        }
        boolean isNewGroup = !this.groups.containsKey((Object)groupId);
        try {
            group = this.getOrMaybeCreateClassicGroup(groupId, isUnknownMember);
        }
        catch (GroupIdNotFoundException t2) {
            responseFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
            return EMPTY_RESULT;
        }
        if (!this.acceptJoiningMember(group, memberId)) {
            group.remove(memberId);
            responseFuture.complete(new JoinGroupResponseData().setMemberId("").setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code()));
        } else {
            result = isUnknownMember ? this.classicGroupJoinNewMember(context, request, group, responseFuture) : this.classicGroupJoinExistingMember(context, request, group, responseFuture);
        }
        if (isNewGroup && result == EMPTY_RESULT) {
            CompletableFuture appendFuture = new CompletableFuture();
            appendFuture.whenComplete((__, t) -> {
                if (t != null) {
                    this.log.warn("Failed to write empty metadata for group {}: {}", (Object)group.groupId(), (Object)t.getMessage());
                    responseFuture.complete(new JoinGroupResponseData().setErrorCode(GroupMetadataManager.appendGroupMetadataErrorToResponseError(Errors.forException((Throwable)t)).code()));
                }
            });
            records.add(GroupCoordinatorRecordHelpers.newEmptyGroupMetadataRecord(group));
            return new CoordinatorResult(records, appendFuture, false);
        }
        return result;
    }

    private CoordinatorResult<Void, CoordinatorRecord> maybeCompleteJoinPhase(ClassicGroup group) {
        if (!group.isInState(ClassicGroupState.PREPARING_REBALANCE)) {
            this.log.debug("Cannot complete join phase of group {} because the group is in {} state.", (Object)group.groupId(), (Object)group.currentState());
            return EMPTY_RESULT;
        }
        if (group.previousState() == ClassicGroupState.EMPTY) {
            this.log.debug("Cannot complete join phase of group {} because this is an initial rebalance.", (Object)group.groupId());
            return EMPTY_RESULT;
        }
        if (!group.hasAllMembersJoined()) {
            this.log.debug("Cannot complete join phase of group {} because not all the members have rejoined. Members={}, AwaitingJoinResponses={}, PendingJoinMembers={}.", new Object[]{group.groupId(), group.numMembers(), group.numAwaitingJoinResponse(), group.numPendingJoinMembers()});
            return EMPTY_RESULT;
        }
        return this.completeClassicGroupJoin(group);
    }

    private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinNewMember(AuthorizableRequestContext context, JoinGroupRequestData request, ClassicGroup group, CompletableFuture<JoinGroupResponseData> responseFuture) {
        if (group.isInState(ClassicGroupState.DEAD)) {
            responseFuture.complete(new JoinGroupResponseData().setMemberId("").setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        } else if (!group.supportsProtocols(request.protocolType(), request.protocols())) {
            responseFuture.complete(new JoinGroupResponseData().setMemberId("").setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code()));
        } else {
            Optional<String> groupInstanceId = Optional.ofNullable(request.groupInstanceId());
            String newMemberId = group.generateMemberId(context.clientId(), groupInstanceId);
            if (groupInstanceId.isPresent()) {
                return this.classicGroupJoinNewStaticMember(context, request, group, newMemberId, responseFuture);
            }
            return this.classicGroupJoinNewDynamicMember(context, request, group, newMemberId, responseFuture);
        }
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinNewStaticMember(AuthorizableRequestContext context, JoinGroupRequestData request, ClassicGroup group, String newMemberId, CompletableFuture<JoinGroupResponseData> responseFuture) {
        String groupInstanceId = request.groupInstanceId();
        String existingMemberId = group.staticMemberId(groupInstanceId);
        if (existingMemberId != null) {
            this.log.info("Static member with groupInstanceId={} and unknown member id joins group {} in {} state. Replacing previously mapped member {} with this groupInstanceId.", new Object[]{groupInstanceId, group.groupId(), group.currentState(), existingMemberId});
            return this.updateStaticMemberThenRebalanceOrCompleteJoin(context, request, group, existingMemberId, newMemberId, responseFuture);
        }
        this.log.info("Static member with groupInstanceId={} and unknown member id joins group {} in {} state. Created a new member id {} for this member and added to the group.", new Object[]{groupInstanceId, group.groupId(), group.currentState(), newMemberId});
        return this.addMemberThenRebalanceOrCompleteJoin(context, request, group, newMemberId, responseFuture);
    }

    private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinNewDynamicMember(AuthorizableRequestContext context, JoinGroupRequestData request, ClassicGroup group, String newMemberId, CompletableFuture<JoinGroupResponseData> responseFuture) {
        if (!JoinGroupRequest.requiresKnownMemberId((int)context.requestVersion())) {
            this.log.info("Dynamic member with unknown member id joins group {} in state {}. Created a new member id {} and added the member to the group.", new Object[]{group.groupId(), group.currentState(), newMemberId});
            return this.addMemberThenRebalanceOrCompleteJoin(context, request, group, newMemberId, responseFuture);
        }
        this.log.info("Dynamic member with unknown member id joins group {} in {} state. Created a new member id {} and requesting the member to rejoin with this id.", new Object[]{group.groupId(), group.currentState(), newMemberId});
        group.addPendingMember(newMemberId);
        String classicGroupHeartbeatKey = GroupMetadataManager.classicGroupHeartbeatKey(group.groupId(), newMemberId);
        this.timer.schedule(classicGroupHeartbeatKey, (long)request.sessionTimeoutMs(), TimeUnit.MILLISECONDS, false, () -> this.expireClassicGroupMemberHeartbeat(group.groupId(), newMemberId));
        responseFuture.complete(new JoinGroupResponseData().setMemberId(newMemberId).setErrorCode(Errors.MEMBER_ID_REQUIRED.code()));
        return EMPTY_RESULT;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinExistingMember(AuthorizableRequestContext context, JoinGroupRequestData request, ClassicGroup group, CompletableFuture<JoinGroupResponseData> responseFuture) {
        String memberId = request.memberId();
        String groupInstanceId = request.groupInstanceId();
        if (group.isInState(ClassicGroupState.DEAD)) {
            responseFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
            return EMPTY_RESULT;
        }
        if (!group.supportsProtocols(request.protocolType(), request.protocols())) {
            responseFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code()));
            return EMPTY_RESULT;
        }
        if (group.isPendingMember(memberId)) {
            if (groupInstanceId != null) {
                throw new IllegalStateException("Received unexpected JoinGroup with groupInstanceId=" + groupInstanceId + " for pending member with memberId=" + memberId);
            }
            this.log.info("Pending dynamic member with id {} joins group {} in {} state. Adding to the group now.", new Object[]{memberId, group.groupId(), group.currentState()});
            return this.addMemberThenRebalanceOrCompleteJoin(context, request, group, memberId, responseFuture);
        }
        try {
            group.validateMember(memberId, groupInstanceId, "join-group");
        }
        catch (KafkaException ex) {
            responseFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.forException((Throwable)ex).code()).setProtocolType(null).setProtocolName(null));
            return EMPTY_RESULT;
        }
        ClassicGroupMember member = group.member(memberId);
        if (group.isInState(ClassicGroupState.PREPARING_REBALANCE)) {
            return this.updateMemberThenRebalanceOrCompleteJoin(request, group, member, "Member " + member.memberId() + " is joining group during " + group.stateAsString() + "; client reason: " + JoinGroupRequest.joinReason((JoinGroupRequestData)request), responseFuture);
        }
        if (group.isInState(ClassicGroupState.COMPLETING_REBALANCE)) {
            if (!member.matches(request.protocols())) return this.updateMemberThenRebalanceOrCompleteJoin(request, group, member, "Updating metadata for member " + memberId + " during " + group.stateAsString() + "; client reason: " + JoinGroupRequest.joinReason((JoinGroupRequestData)request), responseFuture);
            responseFuture.complete(new JoinGroupResponseData().setMembers(group.isLeader(memberId) ? group.currentClassicGroupMembers() : List.of()).setMemberId(memberId).setGenerationId(group.generationId()).setProtocolName((String)group.protocolName().orElse(null)).setProtocolType((String)group.protocolType().orElse(null)).setLeader(group.leaderOrNull()).setSkipAssignment(false));
            return EMPTY_RESULT;
        } else if (group.isInState(ClassicGroupState.STABLE)) {
            if (group.isLeader(memberId)) {
                return this.updateMemberThenRebalanceOrCompleteJoin(request, group, member, "Leader " + memberId + " re-joining group during " + group.stateAsString() + "; client reason: " + JoinGroupRequest.joinReason((JoinGroupRequestData)request), responseFuture);
            }
            if (!member.matches(request.protocols())) {
                return this.updateMemberThenRebalanceOrCompleteJoin(request, group, member, "Updating metadata for member " + memberId + " during " + group.stateAsString() + "; client reason: " + JoinGroupRequest.joinReason((JoinGroupRequestData)request), responseFuture);
            }
            responseFuture.complete(new JoinGroupResponseData().setMembers(List.of()).setMemberId(memberId).setGenerationId(group.generationId()).setProtocolName((String)group.protocolName().orElse(null)).setProtocolType((String)group.protocolType().orElse(null)).setLeader(group.leaderOrNull()).setSkipAssignment(false));
            return EMPTY_RESULT;
        } else {
            this.log.warn("Attempt to add rejoining member {} of group {} in unexpected group state {}", new Object[]{memberId, group.groupId(), group.stateAsString()});
            responseFuture.complete(new JoinGroupResponseData().setMemberId(memberId).setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
        }
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> completeClassicGroupJoin(String groupId) {
        ClassicGroup group;
        try {
            group = this.getOrMaybeCreateClassicGroup(groupId, false);
        }
        catch (GroupIdNotFoundException exception) {
            this.log.debug("Cannot find the group, skipping rebalance stage.", (Throwable)exception);
            return EMPTY_RESULT;
        }
        return this.completeClassicGroupJoin(group);
    }

    private CoordinatorResult<Void, CoordinatorRecord> completeClassicGroupJoin(ClassicGroup group) {
        this.timer.cancel(GroupMetadataManager.classicGroupJoinKey(group.groupId()));
        String groupId = group.groupId();
        Map<String, ClassicGroupMember> notYetRejoinedDynamicMembers = group.notYetRejoinedMembers().entrySet().stream().filter(entry -> !((ClassicGroupMember)entry.getValue()).isStaticMember()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        if (!notYetRejoinedDynamicMembers.isEmpty()) {
            notYetRejoinedDynamicMembers.values().forEach(failedMember -> {
                group.remove(failedMember.memberId());
                this.timer.cancel(GroupMetadataManager.classicGroupHeartbeatKey(group.groupId(), failedMember.memberId()));
            });
            this.log.info("Group {} removed dynamic members who haven't joined: {}", (Object)groupId, notYetRejoinedDynamicMembers.keySet());
        }
        if (group.isInState(ClassicGroupState.DEAD)) {
            this.log.info("Group {} is dead, skipping rebalance stage.", (Object)groupId);
        } else {
            if (!group.maybeElectNewJoinedLeader() && !group.allMembers().isEmpty()) {
                this.log.error("Group {} could not complete rebalance because no members rejoined.", (Object)groupId);
                this.timer.schedule(GroupMetadataManager.classicGroupJoinKey(groupId), (long)group.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, () -> this.completeClassicGroupJoin(group.groupId()));
                return EMPTY_RESULT;
            }
            group.initNextGeneration();
            if (group.isInState(ClassicGroupState.EMPTY)) {
                this.log.info("Group {} with generation {} is now empty.", (Object)groupId, (Object)group.generationId());
                CompletableFuture appendFuture = new CompletableFuture();
                appendFuture.whenComplete((__, t) -> {
                    if (t != null) {
                        Errors error = GroupMetadataManager.appendGroupMetadataErrorToResponseError(Errors.forException((Throwable)t));
                        this.log.warn("Failed to write empty metadata for group {}: {}", (Object)group.groupId(), (Object)error.message());
                    }
                });
                List<CoordinatorRecord> records = List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, Map.of()));
                return new CoordinatorResult(records, appendFuture, false);
            }
            this.log.info("Stabilized group {} generation {} with {} members.", new Object[]{groupId, group.generationId(), group.numMembers()});
            group.allMembers().forEach(member -> {
                List<Object> members = List.of();
                if (group.isLeader(member.memberId())) {
                    members = group.currentClassicGroupMembers();
                }
                JoinGroupResponseData response = new JoinGroupResponseData().setMembers(members).setMemberId(member.memberId()).setGenerationId(group.generationId()).setProtocolName((String)group.protocolName().orElse(null)).setProtocolType((String)group.protocolType().orElse(null)).setLeader(group.leaderOrNull()).setSkipAssignment(false).setErrorCode(Errors.NONE.code());
                group.completeJoinFuture((ClassicGroupMember)member, response);
                this.rescheduleClassicGroupMemberHeartbeat(group, (ClassicGroupMember)member);
                member.setIsNew(false);
                group.addPendingSyncMember(member.memberId());
            });
            this.schedulePendingSync(group);
        }
        return EMPTY_RESULT;
    }

    private void schedulePendingSync(ClassicGroup group) {
        this.timer.schedule(GroupMetadataManager.classicGroupSyncKey(group.groupId()), (long)group.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, () -> this.expirePendingSync(group.groupId(), group.generationId()));
    }

    private CoordinatorResult<Void, CoordinatorRecord> expireClassicGroupMemberHeartbeat(String groupId, String memberId) {
        ClassicGroup group;
        try {
            group = this.getOrMaybeCreateClassicGroup(groupId, false);
        }
        catch (GroupIdNotFoundException exception) {
            this.log.debug("Received notification of heartbeat expiration for member {} after group {} had already been deleted or upgraded.", (Object)memberId, (Object)groupId);
            return EMPTY_RESULT;
        }
        if (group.isInState(ClassicGroupState.DEAD)) {
            this.log.info("Received notification of heartbeat expiration for member {} after group {} had already been unloaded or deleted.", (Object)memberId, (Object)group.groupId());
        } else {
            if (group.isPendingMember(memberId)) {
                this.log.info("Pending member {} in group {} has been removed after session timeout expiration.", (Object)memberId, (Object)group.groupId());
                return this.removePendingMemberAndUpdateClassicGroup(group, memberId);
            }
            if (!group.hasMember(memberId)) {
                this.log.debug("Member {} has already been removed from the group.", (Object)memberId);
            } else {
                ClassicGroupMember member = group.member(memberId);
                if (!member.hasSatisfiedHeartbeat()) {
                    this.log.info("Member {} in group {} has failed, removing it from the group.", (Object)member.memberId(), (Object)group.groupId());
                    return this.removeMemberAndUpdateClassicGroup(group, member, "removing member " + member.memberId() + " on heartbeat expiration.");
                }
            }
        }
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> removeMemberAndUpdateClassicGroup(ClassicGroup group, ClassicGroupMember member, String reason) {
        group.completeJoinFuture(member, new JoinGroupResponseData().setMemberId("").setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
        group.remove(member.memberId());
        if (group.isInState(ClassicGroupState.STABLE) || group.isInState(ClassicGroupState.COMPLETING_REBALANCE)) {
            return this.maybePrepareRebalanceOrCompleteJoin(group, reason);
        }
        if (group.isInState(ClassicGroupState.PREPARING_REBALANCE) && group.hasAllMembersJoined()) {
            return this.completeClassicGroupJoin(group);
        }
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> removePendingMemberAndUpdateClassicGroup(ClassicGroup group, String memberId) {
        group.remove(memberId);
        if (group.isInState(ClassicGroupState.PREPARING_REBALANCE) && group.hasAllMembersJoined()) {
            return this.completeClassicGroupJoin(group);
        }
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> updateMemberThenRebalanceOrCompleteJoin(JoinGroupRequestData request, ClassicGroup group, ClassicGroupMember member, String joinReason, CompletableFuture<JoinGroupResponseData> responseFuture) {
        group.updateMember(member, request.protocols(), request.rebalanceTimeoutMs(), request.sessionTimeoutMs(), responseFuture);
        return this.maybePrepareRebalanceOrCompleteJoin(group, joinReason);
    }

    private CoordinatorResult<Void, CoordinatorRecord> addMemberThenRebalanceOrCompleteJoin(AuthorizableRequestContext context, JoinGroupRequestData request, ClassicGroup group, String memberId, CompletableFuture<JoinGroupResponseData> responseFuture) {
        Optional<String> groupInstanceId = Optional.ofNullable(request.groupInstanceId());
        ClassicGroupMember member = new ClassicGroupMember(memberId, groupInstanceId, context.clientId(), context.clientAddress().toString(), request.rebalanceTimeoutMs(), request.sessionTimeoutMs(), request.protocolType(), request.protocols());
        member.setIsNew(true);
        if (group.isInState(ClassicGroupState.PREPARING_REBALANCE) && group.previousState() == ClassicGroupState.EMPTY) {
            group.setNewMemberAdded(true);
        }
        group.add(member, responseFuture);
        this.rescheduleClassicGroupMemberHeartbeat(group, member, this.config.classicGroupNewMemberJoinTimeoutMs());
        return this.maybePrepareRebalanceOrCompleteJoin(group, "Adding new member " + memberId + " with group instance id " + request.groupInstanceId() + "; client reason: " + JoinGroupRequest.joinReason((JoinGroupRequestData)request));
    }

    private CoordinatorResult<Void, CoordinatorRecord> maybePrepareRebalanceOrCompleteJoin(ClassicGroup group, String reason) {
        if (group.canRebalance()) {
            return this.prepareRebalance(group, reason);
        }
        return this.maybeCompleteJoinPhase(group);
    }

    CoordinatorResult<Void, CoordinatorRecord> prepareRebalance(ClassicGroup group, String reason) {
        if (group.isInState(ClassicGroupState.COMPLETING_REBALANCE)) {
            this.resetAndPropagateAssignmentWithError(group, Errors.REBALANCE_IN_PROGRESS);
        }
        this.removeSyncExpiration(group);
        boolean isInitialRebalance = group.isInState(ClassicGroupState.EMPTY);
        if (isInitialRebalance) {
            int delayMs = this.config.classicGroupInitialRebalanceDelayMs();
            int remainingMs = Math.max(group.rebalanceTimeoutMs() - this.config.classicGroupInitialRebalanceDelayMs(), 0);
            this.timer.schedule(GroupMetadataManager.classicGroupJoinKey(group.groupId()), (long)delayMs, TimeUnit.MILLISECONDS, false, () -> this.tryCompleteInitialRebalanceElseSchedule(group.groupId(), delayMs, remainingMs));
        }
        group.transitionTo(ClassicGroupState.PREPARING_REBALANCE);
        this.log.info("Preparing to rebalance group {} in state {} with old generation {} (reason: {}).", new Object[]{group.groupId(), group.currentState(), group.generationId(), reason});
        return isInitialRebalance ? EMPTY_RESULT : this.maybeCompleteJoinElseSchedule(group);
    }

    private CoordinatorResult<Void, CoordinatorRecord> maybeCompleteJoinElseSchedule(ClassicGroup group) {
        String classicGroupJoinKey = GroupMetadataManager.classicGroupJoinKey(group.groupId());
        if (group.hasAllMembersJoined()) {
            return this.completeClassicGroupJoin(group);
        }
        this.timer.schedule(classicGroupJoinKey, (long)group.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, () -> this.completeClassicGroupJoin(group.groupId()));
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> tryCompleteInitialRebalanceElseSchedule(String groupId, int delayMs, int remainingMs) {
        ClassicGroup group;
        try {
            group = this.getOrMaybeCreateClassicGroup(groupId, false);
        }
        catch (GroupIdNotFoundException exception) {
            this.log.debug("Cannot find the group, skipping the initial rebalance stage.", (Throwable)exception);
            return EMPTY_RESULT;
        }
        if (!group.newMemberAdded() || remainingMs == 0) {
            return this.completeClassicGroupJoin(group);
        }
        group.setNewMemberAdded(false);
        int newDelayMs = Math.min(this.config.classicGroupInitialRebalanceDelayMs(), remainingMs);
        int newRemainingMs = Math.max(remainingMs - delayMs, 0);
        this.timer.schedule(GroupMetadataManager.classicGroupJoinKey(group.groupId()), (long)newDelayMs, TimeUnit.MILLISECONDS, false, () -> this.tryCompleteInitialRebalanceElseSchedule(group.groupId(), newDelayMs, newRemainingMs));
        return EMPTY_RESULT;
    }

    private void resetAndPropagateAssignmentWithError(ClassicGroup group, Errors error) {
        if (!group.isInState(ClassicGroupState.COMPLETING_REBALANCE)) {
            throw new IllegalStateException("Group " + group.groupId() + " must be in " + ClassicGroupState.COMPLETING_REBALANCE.name() + " state but is in " + String.valueOf((Object)group.currentState()) + ".");
        }
        group.allMembers().forEach(member -> member.setAssignment(ClassicGroupMember.EMPTY_ASSIGNMENT));
        this.propagateAssignment(group, error);
    }

    private void setAndPropagateAssignment(ClassicGroup group, Map<String, byte[]> assignment) {
        if (!group.isInState(ClassicGroupState.COMPLETING_REBALANCE)) {
            throw new IllegalStateException("The group must be in CompletingRebalance state to set and propagate assignment.");
        }
        group.allMembers().forEach(member -> member.setAssignment(assignment.getOrDefault(member.memberId(), ClassicGroupMember.EMPTY_ASSIGNMENT)));
        this.propagateAssignment(group, Errors.NONE);
    }

    private void propagateAssignment(ClassicGroup group, Errors error) {
        Optional<Object> protocolName = Optional.empty();
        Optional<Object> protocolType = Optional.empty();
        if (error == Errors.NONE) {
            protocolName = group.protocolName();
            protocolType = group.protocolType();
        }
        for (ClassicGroupMember member : group.allMembers()) {
            if (!member.hasAssignment() && error == Errors.NONE) {
                this.log.warn("Sending empty assignment to member {} of {} for generation {} with no errors", new Object[]{member.memberId(), group.groupId(), group.generationId()});
            }
            if (!group.completeSyncFuture(member, new SyncGroupResponseData().setProtocolName((String)protocolName.orElse(null)).setProtocolType((String)protocolType.orElse(null)).setAssignment(member.assignment()).setErrorCode(error.code()))) continue;
            this.rescheduleClassicGroupMemberHeartbeat(group, member);
        }
    }

    public void rescheduleClassicGroupMemberHeartbeat(ClassicGroup group, ClassicGroupMember member) {
        this.rescheduleClassicGroupMemberHeartbeat(group, member, member.sessionTimeoutMs());
    }

    private void rescheduleClassicGroupMemberHeartbeat(ClassicGroup group, ClassicGroupMember member, long timeoutMs) {
        String classicGroupHeartbeatKey = GroupMetadataManager.classicGroupHeartbeatKey(group.groupId(), member.memberId());
        this.timer.schedule(classicGroupHeartbeatKey, timeoutMs, TimeUnit.MILLISECONDS, false, () -> this.expireClassicGroupMemberHeartbeat(group.groupId(), member.memberId()));
    }

    private void removeSyncExpiration(ClassicGroup group) {
        group.clearPendingSyncMembers();
        this.timer.cancel(GroupMetadataManager.classicGroupSyncKey(group.groupId()));
    }

    private CoordinatorResult<Void, CoordinatorRecord> expirePendingSync(String groupId, int generationId) {
        ClassicGroup group;
        try {
            group = this.getOrMaybeCreateClassicGroup(groupId, false);
        }
        catch (GroupIdNotFoundException exception) {
            this.log.debug("Received notification of sync expiration for an unknown classic group {}.", (Object)groupId);
            return EMPTY_RESULT;
        }
        if (generationId != group.generationId()) {
            this.log.error("Received unexpected notification of sync expiration for {} with an old generation {} while the group has {}.", new Object[]{group.groupId(), generationId, group.generationId()});
        } else if (group.isInState(ClassicGroupState.DEAD) || group.isInState(ClassicGroupState.EMPTY) || group.isInState(ClassicGroupState.PREPARING_REBALANCE)) {
            this.log.error("Received unexpected notification of sync expiration after group {} already transitioned to {} state.", (Object)group.groupId(), (Object)group.stateAsString());
        } else if ((group.isInState(ClassicGroupState.COMPLETING_REBALANCE) || group.isInState(ClassicGroupState.STABLE)) && !group.hasReceivedSyncFromAllMembers()) {
            HashSet<String> pendingSyncMembers = new HashSet<String>(group.allPendingSyncMembers());
            pendingSyncMembers.forEach(memberId -> {
                group.remove((String)memberId);
                this.timer.cancel(GroupMetadataManager.classicGroupHeartbeatKey(group.groupId(), memberId));
            });
            this.log.debug("Group {} removed members who haven't sent their sync requests: {}", (Object)group.groupId(), pendingSyncMembers);
            return this.prepareRebalance(group, "Removing " + String.valueOf(pendingSyncMembers) + " on pending sync request expiration");
        }
        return EMPTY_RESULT;
    }

    private boolean acceptJoiningMember(ClassicGroup group, String memberId) {
        switch (group.currentState()) {
            case EMPTY: 
            case DEAD: {
                return true;
            }
            case PREPARING_REBALANCE: {
                return group.hasMember(memberId) && group.member(memberId).isAwaitingJoin() || group.numAwaitingJoinResponse() < this.config.classicGroupMaxSize();
            }
            case COMPLETING_REBALANCE: 
            case STABLE: {
                return group.hasMember(memberId) || group.numMembers() < this.config.classicGroupMaxSize();
            }
        }
        throw new IllegalStateException("Unknown group state: " + group.stateAsString());
    }

    private CoordinatorResult<Void, CoordinatorRecord> updateStaticMemberThenRebalanceOrCompleteJoin(AuthorizableRequestContext context, JoinGroupRequestData request, ClassicGroup group, String oldMemberId, String newMemberId, CompletableFuture<JoinGroupResponseData> responseFuture) {
        String currentLeader = group.leaderOrNull();
        ClassicGroupMember newMember = group.replaceStaticMember(request.groupInstanceId(), oldMemberId, newMemberId);
        this.rescheduleClassicGroupMemberHeartbeat(group, newMember);
        int oldRebalanceTimeoutMs = newMember.rebalanceTimeoutMs();
        int oldSessionTimeoutMs = newMember.sessionTimeoutMs();
        JoinGroupRequestData.JoinGroupRequestProtocolCollection oldProtocols = newMember.supportedProtocols();
        group.updateMember(newMember, request.protocols(), request.rebalanceTimeoutMs(), request.sessionTimeoutMs(), responseFuture);
        if (group.isInState(ClassicGroupState.STABLE)) {
            String groupInstanceId = request.groupInstanceId();
            String selectedProtocolForNextGeneration = group.selectProtocol();
            if (group.protocolName().orElse("").equals(selectedProtocolForNextGeneration)) {
                this.log.info("Static member which joins during Stable stage and doesn't affect the selected protocol will not trigger a rebalance.");
                Map<String, byte[]> groupAssignment = group.groupAssignment();
                CompletableFuture appendFuture = new CompletableFuture();
                appendFuture.whenComplete((__, t) -> {
                    if (t != null) {
                        this.log.warn("Failed to persist metadata for group {} static member {} with group instance id {} due to {}. Reverting to old member id {}.", new Object[]{group.groupId(), newMemberId, groupInstanceId, t.getMessage(), oldMemberId});
                        group.updateMember(newMember, oldProtocols, oldRebalanceTimeoutMs, oldSessionTimeoutMs, null);
                        ClassicGroupMember oldMember = group.replaceStaticMember(groupInstanceId, newMemberId, oldMemberId);
                        this.rescheduleClassicGroupMemberHeartbeat(group, oldMember);
                        responseFuture.complete(new JoinGroupResponseData().setMemberId("").setGenerationId(group.generationId()).setProtocolName((String)group.protocolName().orElse(null)).setProtocolType((String)group.protocolType().orElse(null)).setLeader(currentLeader).setSkipAssignment(false).setErrorCode(GroupMetadataManager.appendGroupMetadataErrorToResponseError(Errors.forException((Throwable)t)).code()));
                    } else if (JoinGroupRequest.supportsSkippingAssignment((int)context.requestVersion())) {
                        boolean isLeader = group.isLeader(newMemberId);
                        group.completeJoinFuture(newMember, new JoinGroupResponseData().setMembers(isLeader ? group.currentClassicGroupMembers() : List.of()).setMemberId(newMemberId).setGenerationId(group.generationId()).setProtocolName((String)group.protocolName().orElse(null)).setProtocolType((String)group.protocolType().orElse(null)).setLeader(group.leaderOrNull()).setSkipAssignment(isLeader));
                    } else {
                        group.completeJoinFuture(newMember, new JoinGroupResponseData().setMemberId(newMemberId).setGenerationId(group.generationId()).setProtocolName((String)group.protocolName().orElse(null)).setProtocolType((String)group.protocolType().orElse(null)).setLeader(currentLeader).setSkipAssignment(false));
                    }
                });
                List<CoordinatorRecord> records = List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, groupAssignment));
                return new CoordinatorResult(records, appendFuture, false);
            }
            return this.maybePrepareRebalanceOrCompleteJoin(group, "Group's selectedProtocol will change because static member " + newMember.memberId() + " with instance id " + groupInstanceId + " joined with change of protocol; client reason: " + JoinGroupRequest.joinReason((JoinGroupRequestData)request));
        }
        if (group.isInState(ClassicGroupState.COMPLETING_REBALANCE)) {
            return this.prepareRebalance(group, "Updating metadata for static member " + newMember.memberId() + " with instance id " + request.groupInstanceId() + "; client reason: " + JoinGroupRequest.joinReason((JoinGroupRequestData)request));
        }
        if (group.isInState(ClassicGroupState.EMPTY) || group.isInState(ClassicGroupState.DEAD)) {
            throw new IllegalStateException("Group " + group.groupId() + " was not supposed to be in the state " + group.stateAsString() + " when the unknown static member " + request.groupInstanceId() + " rejoins.");
        }
        return this.maybeCompleteJoinPhase(group);
    }

    public CoordinatorResult<Void, CoordinatorRecord> classicGroupSync(AuthorizableRequestContext context, SyncGroupRequestData request, CompletableFuture<SyncGroupResponseData> responseFuture) throws UnknownMemberIdException {
        Group group;
        try {
            group = this.group(request.groupId());
        }
        catch (GroupIdNotFoundException e) {
            responseFuture.complete(new SyncGroupResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
            return EMPTY_RESULT;
        }
        if (group.isEmpty()) {
            responseFuture.complete(new SyncGroupResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
            return EMPTY_RESULT;
        }
        if (group.type() == Group.GroupType.CLASSIC) {
            return this.classicGroupSyncToClassicGroup((ClassicGroup)group, context, request, responseFuture);
        }
        if (group.type() == Group.GroupType.CONSUMER) {
            return this.classicGroupSyncToConsumerGroup((ConsumerGroup)group, context, request, responseFuture);
        }
        responseFuture.complete(new SyncGroupResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> classicGroupSyncToClassicGroup(ClassicGroup group, AuthorizableRequestContext context, SyncGroupRequestData request, CompletableFuture<SyncGroupResponseData> responseFuture) throws IllegalStateException {
        String groupId = request.groupId();
        String memberId = request.memberId();
        Optional<Errors> errorOpt = this.validateSyncGroup(group, request);
        if (errorOpt.isPresent()) {
            responseFuture.complete(new SyncGroupResponseData().setErrorCode(errorOpt.get().code()));
        } else if (group.isInState(ClassicGroupState.PREPARING_REBALANCE)) {
            responseFuture.complete(new SyncGroupResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
        } else if (group.isInState(ClassicGroupState.COMPLETING_REBALANCE)) {
            group.member(memberId).setAwaitingSyncFuture(responseFuture);
            this.removePendingSyncMember(group, request.memberId());
            if (group.isLeader(memberId)) {
                this.log.info("Assignment received from leader {} for group {} for generation {}. The group has {} members, {} of which are static.", new Object[]{memberId, groupId, group.generationId(), group.numMembers(), group.allStaticMemberIds().size()});
                HashMap<String, byte[]> assignment = new HashMap<String, byte[]>();
                request.assignments().forEach(memberAssignment -> assignment.put(memberAssignment.memberId(), memberAssignment.assignment()));
                HashMap membersWithMissingAssignment = new HashMap();
                group.allMembers().forEach(member -> {
                    if (!assignment.containsKey(member.memberId())) {
                        membersWithMissingAssignment.put(member.memberId(), ClassicGroupMember.EMPTY_ASSIGNMENT);
                    }
                });
                assignment.putAll(membersWithMissingAssignment);
                if (!membersWithMissingAssignment.isEmpty()) {
                    this.log.warn("Setting empty assignments for members {} of {} for generation {}.", new Object[]{membersWithMissingAssignment, groupId, group.generationId()});
                }
                CompletableFuture appendFuture = new CompletableFuture();
                appendFuture.whenComplete((__, t) -> {
                    if (group.isInState(ClassicGroupState.COMPLETING_REBALANCE) && request.generationId() == group.generationId()) {
                        if (t != null) {
                            Errors error = GroupMetadataManager.appendGroupMetadataErrorToResponseError(Errors.forException((Throwable)t));
                            this.resetAndPropagateAssignmentWithError(group, error);
                            this.maybePrepareRebalanceOrCompleteJoin(group, "Error " + String.valueOf(error) + " when storing group assignmentduring SyncGroup (member: " + memberId + ").");
                        } else {
                            this.setAndPropagateAssignment(group, assignment);
                            group.transitionTo(ClassicGroupState.STABLE);
                            this.metrics.record("CompletedRebalances");
                        }
                    }
                });
                List<CoordinatorRecord> records = List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, assignment));
                return new CoordinatorResult(records, appendFuture, false);
            }
        } else if (group.isInState(ClassicGroupState.STABLE)) {
            this.removePendingSyncMember(group, memberId);
            ClassicGroupMember member2 = group.member(memberId);
            responseFuture.complete(new SyncGroupResponseData().setProtocolType((String)group.protocolType().orElse(null)).setProtocolName((String)group.protocolName().orElse(null)).setAssignment(member2.assignment()).setErrorCode(Errors.NONE.code()));
        } else if (group.isInState(ClassicGroupState.DEAD)) {
            throw new IllegalStateException("Reached unexpected condition for Dead group " + groupId);
        }
        return EMPTY_RESULT;
    }

    private CoordinatorResult<Void, CoordinatorRecord> classicGroupSyncToConsumerGroup(ConsumerGroup group, AuthorizableRequestContext context, SyncGroupRequestData request, CompletableFuture<SyncGroupResponseData> responseFuture) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException, InconsistentGroupProtocolException, RebalanceInProgressException, IllegalStateException {
        String groupId = request.groupId();
        String memberId = request.memberId();
        String instanceId = request.groupInstanceId();
        ConsumerGroupMember member = this.validateConsumerGroupMember(group, memberId, instanceId);
        this.throwIfMemberDoesNotUseClassicProtocol(member);
        this.throwIfGenerationIdUnmatched(member.memberId(), member.memberEpoch(), request.generationId());
        this.throwIfClassicProtocolUnmatched(member, request.protocolType(), request.protocolName());
        this.throwIfRebalanceInProgress(group, member);
        CompletableFuture appendFuture = new CompletableFuture();
        appendFuture.whenComplete((__, t) -> {
            if (t == null) {
                this.cancelConsumerGroupSyncTimeout(groupId, memberId);
                this.scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get());
                responseFuture.complete(new SyncGroupResponseData().setProtocolType(request.protocolType()).setProtocolName(request.protocolName()).setAssignment(this.prepareAssignment(member)));
            }
        });
        return new CoordinatorResult(List.of(), appendFuture, false);
    }

    private byte[] prepareAssignment(ConsumerGroupMember member) {
        try {
            return ConsumerProtocol.serializeAssignment((ConsumerProtocolAssignment)Utils.toConsumerProtocolAssignment(member.assignedPartitions(), this.metadataImage.topics()), (short)ConsumerProtocol.deserializeVersion((ByteBuffer)ByteBuffer.wrap(member.classicMemberMetadata().get().supportedProtocols().iterator().next().metadata()))).array();
        }
        catch (SchemaException e) {
            throw new IllegalStateException("Malformed embedded consumer protocol in version deserialization.");
        }
    }

    static Errors appendGroupMetadataErrorToResponseError(Errors appendError) {
        switch (appendError) {
            case UNKNOWN_TOPIC_OR_PARTITION: 
            case NOT_ENOUGH_REPLICAS: 
            case REQUEST_TIMED_OUT: {
                return Errors.COORDINATOR_NOT_AVAILABLE;
            }
            case NOT_LEADER_OR_FOLLOWER: 
            case KAFKA_STORAGE_ERROR: {
                return Errors.NOT_COORDINATOR;
            }
            case MESSAGE_TOO_LARGE: 
            case RECORD_LIST_TOO_LARGE: 
            case INVALID_FETCH_SIZE: {
                return Errors.UNKNOWN_SERVER_ERROR;
            }
        }
        return appendError;
    }

    private Optional<Errors> validateSyncGroup(ClassicGroup group, SyncGroupRequestData request) {
        if (group.isInState(ClassicGroupState.DEAD)) {
            return Optional.of(Errors.COORDINATOR_NOT_AVAILABLE);
        }
        try {
            group.validateMember(request.memberId(), request.groupInstanceId(), "sync-group");
        }
        catch (KafkaException ex) {
            return Optional.of(Errors.forException((Throwable)ex));
        }
        if (request.generationId() != group.generationId()) {
            return Optional.of(Errors.ILLEGAL_GENERATION);
        }
        if (this.isProtocolInconsistent(request.protocolType(), group.protocolType().orElse(null)) || this.isProtocolInconsistent(request.protocolName(), group.protocolName().orElse(null))) {
            return Optional.of(Errors.INCONSISTENT_GROUP_PROTOCOL);
        }
        return Optional.empty();
    }

    private void removePendingSyncMember(ClassicGroup group, String memberId) {
        group.removePendingSyncMember(memberId);
        String syncKey = GroupMetadataManager.classicGroupSyncKey(group.groupId());
        switch (group.currentState()) {
            case EMPTY: 
            case DEAD: 
            case PREPARING_REBALANCE: {
                this.timer.cancel(syncKey);
                break;
            }
            case COMPLETING_REBALANCE: 
            case STABLE: {
                if (!group.hasReceivedSyncFromAllMembers()) break;
                this.timer.cancel(syncKey);
                break;
            }
            default: {
                throw new IllegalStateException("Unknown group state: " + group.stateAsString());
            }
        }
    }

    public CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeat(AuthorizableRequestContext context, HeartbeatRequestData request) {
        Group group;
        try {
            group = this.group(request.groupId());
        }
        catch (GroupIdNotFoundException e) {
            throw new UnknownMemberIdException(String.format("Group %s not found.", request.groupId()));
        }
        if (group.type() == Group.GroupType.CLASSIC) {
            return this.classicGroupHeartbeatToClassicGroup((ClassicGroup)group, context, request);
        }
        if (group.type() == Group.GroupType.CONSUMER) {
            return this.classicGroupHeartbeatToConsumerGroup((ConsumerGroup)group, context, request);
        }
        throw new UnknownMemberIdException(String.format("Group %s not found.", request.groupId()));
    }

    private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeatToClassicGroup(ClassicGroup group, AuthorizableRequestContext context, HeartbeatRequestData request) {
        this.validateClassicGroupHeartbeat(group, request.memberId(), request.groupInstanceId(), request.generationId());
        switch (group.currentState()) {
            case EMPTY: {
                return new CoordinatorResult(List.of(), (Object)new HeartbeatResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
            }
            case PREPARING_REBALANCE: {
                this.rescheduleClassicGroupMemberHeartbeat(group, group.member(request.memberId()));
                return new CoordinatorResult(List.of(), (Object)new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
            }
            case COMPLETING_REBALANCE: 
            case STABLE: {
                this.rescheduleClassicGroupMemberHeartbeat(group, group.member(request.memberId()));
                return new CoordinatorResult(List.of(), (Object)new HeartbeatResponseData());
            }
        }
        throw new IllegalStateException("Reached unexpected state " + String.valueOf((Object)group.currentState()) + " for group " + group.groupId());
    }

    private void validateClassicGroupHeartbeat(ClassicGroup group, String memberId, String groupInstanceId, int generationId) throws CoordinatorNotAvailableException, IllegalGenerationException {
        if (group.isInState(ClassicGroupState.DEAD)) {
            throw Errors.COORDINATOR_NOT_AVAILABLE.exception();
        }
        group.validateMember(memberId, groupInstanceId, "heartbeat");
        if (generationId != group.generationId()) {
            throw Errors.ILLEGAL_GENERATION.exception();
        }
    }

    private CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupHeartbeatToConsumerGroup(ConsumerGroup group, AuthorizableRequestContext context, HeartbeatRequestData request) throws UnknownMemberIdException, FencedInstanceIdException, IllegalGenerationException {
        String groupId = request.groupId();
        String memberId = request.memberId();
        String instanceId = request.groupInstanceId();
        ConsumerGroupMember member = this.validateConsumerGroupMember(group, memberId, instanceId);
        this.throwIfMemberDoesNotUseClassicProtocol(member);
        this.throwIfGenerationIdUnmatched(memberId, member.memberEpoch(), request.generationId());
        this.scheduleConsumerGroupSessionTimeout(groupId, memberId, member.classicProtocolSessionTimeout().get());
        Errors error = Errors.NONE;
        if (member.memberEpoch() < group.groupEpoch() || member.state() == MemberState.UNREVOKED_PARTITIONS || member.state() == MemberState.UNRELEASED_PARTITIONS && !group.waitingOnUnreleasedPartition(member)) {
            error = Errors.REBALANCE_IN_PROGRESS;
            this.scheduleConsumerGroupJoinTimeoutIfAbsent(groupId, memberId, member.rebalanceTimeoutMs());
        }
        return new CoordinatorResult(List.of(), (Object)new HeartbeatResponseData().setErrorCode(error.code()));
    }

    private ConsumerGroupMember validateConsumerGroupMember(ConsumerGroup group, String memberId, String instanceId) throws UnknownMemberIdException, FencedInstanceIdException {
        ConsumerGroupMember member;
        if (instanceId == null) {
            member = group.getOrMaybeCreateMember(memberId, false);
        } else {
            member = group.staticMember(instanceId);
            if (member == null) {
                throw new UnknownMemberIdException(String.format("Member with instance id %s is not a member of group %s.", instanceId, group.groupId()));
            }
            this.throwIfInstanceIdIsFenced(member, group.groupId(), memberId, instanceId);
        }
        return member;
    }

    public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeave(AuthorizableRequestContext context, LeaveGroupRequestData request) throws UnknownMemberIdException {
        Group group;
        try {
            group = this.group(request.groupId());
        }
        catch (GroupIdNotFoundException e) {
            throw new UnknownMemberIdException(String.format("Group %s not found.", request.groupId()));
        }
        if (group.type() == Group.GroupType.CLASSIC) {
            return this.classicGroupLeaveToClassicGroup((ClassicGroup)group, request);
        }
        if (group.type() == Group.GroupType.CONSUMER) {
            return this.classicGroupLeaveToConsumerGroup((ConsumerGroup)group, request);
        }
        throw new UnknownMemberIdException(String.format("Group %s not found.", request.groupId()));
    }

    private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeaveToConsumerGroup(ConsumerGroup group, LeaveGroupRequestData request) throws UnknownMemberIdException {
        String groupId = group.groupId();
        ArrayList<LeaveGroupResponseData.MemberResponse> memberResponses = new ArrayList<LeaveGroupResponseData.MemberResponse>();
        HashSet<ConsumerGroupMember> validLeaveGroupMembers = new HashSet<ConsumerGroupMember>();
        for (LeaveGroupRequestData.MemberIdentity memberIdentity : request.members()) {
            String reason = memberIdentity.reason() != null ? memberIdentity.reason() : "not provided";
            try {
                ConsumerGroupMember member;
                if (memberIdentity.groupInstanceId() == null) {
                    member = group.getOrMaybeCreateMember(memberIdentity.memberId(), false);
                    this.log.info("[GroupId {}] Dynamic member {} has left group through explicit `LeaveGroup` request; client reason: {}", new Object[]{groupId, memberIdentity.memberId(), reason});
                } else {
                    member = group.staticMember(memberIdentity.groupInstanceId());
                    this.throwIfStaticMemberIsUnknown(member, memberIdentity.groupInstanceId());
                    if (!"".equals(memberIdentity.memberId())) {
                        this.throwIfInstanceIdIsFenced(member, groupId, memberIdentity.memberId(), memberIdentity.groupInstanceId());
                    }
                    this.log.info("[GroupId {}] Static member {} with instance id {} has left group through explicit `LeaveGroup` request; client reason: {}", new Object[]{groupId, memberIdentity.memberId(), memberIdentity.groupInstanceId(), reason});
                }
                memberResponses.add(new LeaveGroupResponseData.MemberResponse().setMemberId(memberIdentity.memberId()).setGroupInstanceId(memberIdentity.groupInstanceId()));
                validLeaveGroupMembers.add(member);
            }
            catch (KafkaException e) {
                memberResponses.add(new LeaveGroupResponseData.MemberResponse().setMemberId(memberIdentity.memberId()).setGroupInstanceId(memberIdentity.groupInstanceId()).setErrorCode(Errors.forException((Throwable)e).code()));
            }
        }
        return this.consumerGroupFenceMembers(group, validLeaveGroupMembers, new LeaveGroupResponseData().setMembers(memberResponses));
    }

    private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeaveToClassicGroup(ClassicGroup group, LeaveGroupRequestData request) throws UnknownMemberIdException {
        if (group.isInState(ClassicGroupState.DEAD)) {
            return new CoordinatorResult(List.of(), (Object)new LeaveGroupResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()));
        }
        ArrayList<LeaveGroupResponseData.MemberResponse> memberResponses = new ArrayList<LeaveGroupResponseData.MemberResponse>();
        for (LeaveGroupRequestData.MemberIdentity member : request.members()) {
            String reason;
            String string = reason = member.reason() != null ? member.reason() : "not provided";
            if ("".equals(member.memberId())) {
                if (member.groupInstanceId() != null && group.hasStaticMember(member.groupInstanceId())) {
                    this.removeCurrentMemberFromClassicGroup(group, group.staticMemberId(member.groupInstanceId()), reason);
                    memberResponses.add(new LeaveGroupResponseData.MemberResponse().setMemberId(member.memberId()).setGroupInstanceId(member.groupInstanceId()));
                    continue;
                }
                memberResponses.add(new LeaveGroupResponseData.MemberResponse().setMemberId(member.memberId()).setGroupInstanceId(member.groupInstanceId()).setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
                continue;
            }
            if (group.isPendingMember(member.memberId())) {
                group.remove(member.memberId());
                this.timer.cancel(GroupMetadataManager.classicGroupHeartbeatKey(group.groupId(), member.memberId()));
                this.log.info("[Group {}] Pending member {} has left group through explicit `LeaveGroup` request; client reason: {}", new Object[]{group.groupId(), member.memberId(), reason});
                memberResponses.add(new LeaveGroupResponseData.MemberResponse().setMemberId(member.memberId()).setGroupInstanceId(member.groupInstanceId()));
                continue;
            }
            try {
                group.validateMember(member.memberId(), member.groupInstanceId(), "leave-group");
                this.removeCurrentMemberFromClassicGroup(group, member.memberId(), reason);
                memberResponses.add(new LeaveGroupResponseData.MemberResponse().setMemberId(member.memberId()).setGroupInstanceId(member.groupInstanceId()));
            }
            catch (KafkaException e) {
                memberResponses.add(new LeaveGroupResponseData.MemberResponse().setMemberId(member.memberId()).setGroupInstanceId(member.groupInstanceId()).setErrorCode(Errors.forException((Throwable)e).code()));
            }
        }
        List<String> validLeaveGroupMembers = memberResponses.stream().filter(response -> response.errorCode() == Errors.NONE.code()).map(LeaveGroupResponseData.MemberResponse::memberId).toList();
        String reason = "explicit `LeaveGroup` request for (" + String.join((CharSequence)", ", validLeaveGroupMembers) + ") members.";
        CoordinatorResult<Void, CoordinatorRecord> coordinatorResult = EMPTY_RESULT;
        if (!validLeaveGroupMembers.isEmpty()) {
            switch (group.currentState()) {
                case COMPLETING_REBALANCE: 
                case STABLE: {
                    coordinatorResult = this.maybePrepareRebalanceOrCompleteJoin(group, reason);
                    break;
                }
                case PREPARING_REBALANCE: {
                    coordinatorResult = this.maybeCompleteJoinPhase(group);
                    break;
                }
            }
        }
        return new CoordinatorResult(coordinatorResult.records(), (Object)new LeaveGroupResponseData().setMembers(memberResponses), coordinatorResult.appendFuture(), coordinatorResult.replayRecords());
    }

    private void removeCurrentMemberFromClassicGroup(ClassicGroup group, String memberId, String reason) {
        ClassicGroupMember member = group.member(memberId);
        this.timer.cancel(GroupMetadataManager.classicGroupHeartbeatKey(group.groupId(), memberId));
        this.log.info("[Group {}] Member {} has left group through explicit `LeaveGroup` request; client reason: {}", new Object[]{group.groupId(), memberId, reason});
        group.completeJoinFuture(member, new JoinGroupResponseData().setMemberId("").setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
        group.remove(member.memberId());
    }

    public void createGroupTombstoneRecords(String groupId, List<CoordinatorRecord> records) {
        this.createGroupTombstoneRecords(this.group(groupId), records);
    }

    public void createGroupTombstoneRecords(Group group, List<CoordinatorRecord> records) {
        group.createGroupTombstoneRecords(records);
    }

    public Optional<DeleteShareGroupStateParameters> shareGroupBuildPartitionDeleteRequest(String shareGroupId, List<CoordinatorRecord> records) {
        Map<Uuid, ShareGroup.InitMapValue> deletingTopics;
        if (!this.shareGroupStatePartitionMetadata.containsKey((Object)shareGroupId)) {
            return Optional.empty();
        }
        Map<Uuid, ShareGroup.InitMapValue> deleteCandidates = GroupMetadataManager.combineInitMaps(((ShareGroup.ShareGroupStatePartitionMetadataInfo)this.shareGroupStatePartitionMetadata.get((Object)shareGroupId)).initializedTopics(), ((ShareGroup.ShareGroupStatePartitionMetadataInfo)this.shareGroupStatePartitionMetadata.get((Object)shareGroupId)).initializingTopics());
        Set<Uuid> deletingCurrent = ((ShareGroup.ShareGroupStatePartitionMetadataInfo)this.shareGroupStatePartitionMetadata.get((Object)shareGroupId)).deletingTopics();
        if (this.metadataImage != null && !this.metadataImage.equals((Object)MetadataImage.EMPTY) && !(deletingTopics = deletingCurrent.stream().map(tid -> this.metadataImage.topics().getTopic(tid)).filter(Objects::nonNull).map(image -> Map.entry(image.id(), new ShareGroup.InitMapValue(image.name(), image.partitions().keySet(), -1L))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).isEmpty()) {
            this.log.info("Existing deleting entries found in share group {} - {}", (Object)shareGroupId, deletingTopics);
            deleteCandidates = GroupMetadataManager.combineInitMaps(deleteCandidates, deletingTopics);
        }
        if (deleteCandidates.isEmpty() && deletingCurrent.isEmpty()) {
            return Optional.empty();
        }
        ArrayList<TopicData> topicDataList = new ArrayList<TopicData>(deleteCandidates.size());
        for (Map.Entry<Uuid, ShareGroup.InitMapValue> entry : deleteCandidates.entrySet()) {
            topicDataList.add(new TopicData(entry.getKey(), entry.getValue().partitions().stream().map(PartitionFactory::newPartitionIdData).toList()));
        }
        records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(shareGroupId, Map.of(), Map.of(), this.attachTopicName(deleteCandidates.keySet())));
        if (topicDataList.isEmpty()) {
            return Optional.empty();
        }
        return Optional.of(new DeleteShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData.Builder().setGroupId(shareGroupId).setTopicsData(topicDataList).build()).build());
    }

    public List<DeleteShareGroupStateRequestData.DeleteStateData> sharePartitionsEligibleForOffsetDeletion(String groupId, DeleteShareGroupOffsetsRequestData requestData, List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> errorTopicResponseList, List<CoordinatorRecord> records) {
        ArrayList<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData = new ArrayList<DeleteShareGroupStateRequestData.DeleteStateData>();
        ShareGroup.ShareGroupStatePartitionMetadataInfo currentMap = (ShareGroup.ShareGroupStatePartitionMetadataInfo)this.shareGroupStatePartitionMetadata.get((Object)groupId);
        if (currentMap == null) {
            return deleteShareGroupStateRequestTopicsData;
        }
        HashMap<Uuid, ShareGroup.InitMapValue> initializedTopics = new HashMap<Uuid, ShareGroup.InitMapValue>();
        currentMap.initializedTopics().forEach((topicId, initValue) -> initializedTopics.put((Uuid)topicId, new ShareGroup.InitMapValue(initValue.name(), new HashSet<Integer>(initValue.partitions()), initValue.timestamp())));
        HashSet<Uuid> deletingTopics = new HashSet<Uuid>(currentMap.deletingTopics());
        requestData.topics().forEach(topic -> {
            TopicImage topicImage = this.metadataImage.topics().getTopic(topic.topicName());
            if (topicImage != null) {
                Uuid topicId = topicImage.id();
                if (initializedTopics.containsKey(topicId)) {
                    ArrayList partitions = new ArrayList();
                    ((ShareGroup.InitMapValue)initializedTopics.get(topicId)).partitions().forEach(partition -> partitions.add(new DeleteShareGroupStateRequestData.PartitionData().setPartition(partition.intValue())));
                    deleteShareGroupStateRequestTopicsData.add(new DeleteShareGroupStateRequestData.DeleteStateData().setTopicId(topicId).setPartitions(partitions));
                    initializedTopics.remove(topicId);
                    deletingTopics.add(topicId);
                } else if (deletingTopics.contains(topicId)) {
                    ArrayList partitions = new ArrayList();
                    topicImage.partitions().keySet().forEach(partition -> partitions.add(new DeleteShareGroupStateRequestData.PartitionData().setPartition(partition.intValue())));
                    deleteShareGroupStateRequestTopicsData.add(new DeleteShareGroupStateRequestData.DeleteStateData().setTopicId(topicId).setPartitions(partitions));
                } else {
                    errorTopicResponseList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName(topic.topicName()).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage("There is no offset information to delete."));
                }
            } else {
                errorTopicResponseList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicName(topic.topicName()).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()));
            }
        });
        records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(groupId, currentMap.initializingTopics(), initializedTopics, this.attachTopicName(deletingTopics)));
        return deleteShareGroupStateRequestTopicsData;
    }

    public Map.Entry<AlterShareGroupOffsetsResponseData, InitializeShareGroupStateParameters> completeAlterShareGroupOffsets(String groupId, AlterShareGroupOffsetsRequestData alterShareGroupOffsetsRequest, List<CoordinatorRecord> records) {
        long currentTimeMs = this.time.milliseconds();
        Group group = (Group)this.groups.get((Object)groupId);
        AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection alterShareGroupOffsetsResponseTopics = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection();
        HashMap<Uuid, ShareGroup.InitMapValue> initializingTopics = new HashMap<Uuid, ShareGroup.InitMapValue>();
        HashMap<Uuid, Map<Integer, Long>> offsetByTopicPartitions = new HashMap<Uuid, Map<Integer, Long>>();
        alterShareGroupOffsetsRequest.topics().forEach(topic -> {
            TopicImage topicImage = this.metadataImage.topics().getTopic(topic.topicName());
            if (topicImage != null) {
                Uuid topicId = topicImage.id();
                HashSet existingPartitions = new HashSet(topicImage.partitions().keySet());
                ArrayList partitions = new ArrayList();
                topic.partitions().forEach(partition -> {
                    if (existingPartitions.contains(partition.partitionIndex())) {
                        partitions.add(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(partition.partitionIndex()).setErrorCode(Errors.NONE.code()));
                        offsetByTopicPartitions.computeIfAbsent(topicId, k -> new HashMap()).put(partition.partitionIndex(), partition.startOffset());
                    } else {
                        partitions.add(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(partition.partitionIndex()).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()));
                    }
                });
                initializingTopics.put(topicId, new ShareGroup.InitMapValue(topic.topicName(), topic.partitions().stream().map(AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestPartition::partitionIndex).filter(existingPartitions::contains).collect(Collectors.toSet()), currentTimeMs));
                alterShareGroupOffsetsResponseTopics.add((ImplicitLinkedHashCollection.Element)new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName(topic.topicName()).setTopicId(topicId).setPartitions(partitions));
            } else {
                ArrayList partitions = new ArrayList();
                topic.partitions().forEach(partition -> partitions.add(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition().setPartitionIndex(partition.partitionIndex()).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())));
                alterShareGroupOffsetsResponseTopics.add((ImplicitLinkedHashCollection.Element)new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic().setTopicName(topic.topicName()).setPartitions(partitions));
            }
        });
        this.addInitializingTopicsRecords(groupId, records, initializingTopics);
        return Map.entry(new AlterShareGroupOffsetsResponseData().setResponses(alterShareGroupOffsetsResponseTopics), this.buildInitializeShareGroupState(groupId, ((ShareGroup)group).groupEpoch(), offsetByTopicPartitions));
    }

    public CoordinatorResult<Void, CoordinatorRecord> maybeCleanupShareGroupState(Set<Uuid> deletedTopicIds) {
        if (deletedTopicIds.isEmpty()) {
            return new CoordinatorResult(List.of());
        }
        ArrayList records = new ArrayList();
        this.shareGroupStatePartitionMetadata.forEach((groupId, metadata) -> {
            HashSet<Uuid> initializingDeletedCurrent = new HashSet<Uuid>(metadata.initializingTopics().keySet());
            HashSet<Uuid> initializedDeletedCurrent = new HashSet<Uuid>(metadata.initializedTopics().keySet());
            initializingDeletedCurrent.retainAll(deletedTopicIds);
            initializedDeletedCurrent.retainAll(deletedTopicIds);
            if (initializingDeletedCurrent.isEmpty() && initializedDeletedCurrent.isEmpty()) {
                return;
            }
            HashMap<Uuid, ShareGroup.InitMapValue> finalInitializing = new HashMap<Uuid, ShareGroup.InitMapValue>(metadata.initializingTopics());
            initializingDeletedCurrent.forEach(finalInitializing::remove);
            HashMap<Uuid, ShareGroup.InitMapValue> finalInitialized = new HashMap<Uuid, ShareGroup.InitMapValue>(metadata.initializedTopics());
            initializedDeletedCurrent.forEach(finalInitialized::remove);
            HashSet<Uuid> deletingTopics = new HashSet<Uuid>(metadata.deletingTopics());
            deletingTopics.removeAll(deletedTopicIds);
            records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(groupId, finalInitializing, finalInitialized, this.attachTopicName(deletingTopics)));
        });
        return new CoordinatorResult(records);
    }

    public List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> completeDeleteShareGroupOffsets(String groupId, Map<Uuid, String> topics, List<CoordinatorRecord> records) {
        ShareGroup.ShareGroupStatePartitionMetadataInfo currentMap = (ShareGroup.ShareGroupStatePartitionMetadataInfo)this.shareGroupStatePartitionMetadata.get((Object)groupId);
        if (currentMap == null) {
            return List.of();
        }
        HashSet<Uuid> updatedDeletingTopics = new HashSet<Uuid>(currentMap.deletingTopics());
        topics.keySet().forEach(updatedDeletingTopics::remove);
        records.add(GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord(groupId, currentMap.initializingTopics(), currentMap.initializedTopics(), this.attachTopicName(updatedDeletingTopics)));
        return topics.entrySet().stream().map(entry -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic().setTopicId((Uuid)entry.getKey()).setTopicName((String)entry.getValue()).setErrorCode(Errors.NONE.code()).setErrorMessage(null)).toList();
    }

    void validateDeleteGroup(String groupId) throws ApiException {
        Group group = this.group(groupId);
        group.validateDeleteGroup();
    }

    public void maybeDeleteGroup(String groupId, List<CoordinatorRecord> records) {
        Group group = (Group)this.groups.get((Object)groupId);
        if (group != null && group.isEmpty()) {
            this.createGroupTombstoneRecords(groupId, records);
        }
    }

    private static boolean isEmptyClassicGroup(Group group) {
        return group != null && group.type() == Group.GroupType.CLASSIC && group.isEmpty();
    }

    private static boolean isEmptyConsumerGroup(Group group) {
        return group != null && group.type() == Group.GroupType.CONSUMER && group.isEmpty();
    }

    private boolean maybeDeleteEmptyClassicGroup(Group group, List<CoordinatorRecord> records) {
        if (GroupMetadataManager.isEmptyClassicGroup(group)) {
            this.createGroupTombstoneRecords(group, records);
            return true;
        }
        return false;
    }

    private boolean maybeDeleteEmptyConsumerGroup(String groupId, List<CoordinatorRecord> records) {
        Group group = (Group)this.groups.get((Object)groupId, Long.MAX_VALUE);
        if (GroupMetadataManager.isEmptyConsumerGroup(group)) {
            this.createGroupTombstoneRecords(group, records);
            this.removeGroup(groupId);
            return true;
        }
        return false;
    }

    private boolean isProtocolInconsistent(String protocolTypeOrName, String groupProtocolTypeOrName) {
        return protocolTypeOrName != null && groupProtocolTypeOrName != null && !groupProtocolTypeOrName.equals(protocolTypeOrName);
    }

    public Set<String> groupIds() {
        return Collections.unmodifiableSet(this.groups.keySet());
    }

    Map<String, Long> topicHashCache() {
        return Collections.unmodifiableMap(this.topicHashCache);
    }

    private int consumerGroupSessionTimeoutMs(String groupId) {
        Optional<GroupConfig> groupConfig = this.groupConfigManager.groupConfig(groupId);
        return groupConfig.map(GroupConfig::consumerSessionTimeoutMs).orElse(this.config.consumerGroupSessionTimeoutMs());
    }

    private int consumerGroupHeartbeatIntervalMs(String groupId) {
        Optional<GroupConfig> groupConfig = this.groupConfigManager.groupConfig(groupId);
        return groupConfig.map(GroupConfig::consumerHeartbeatIntervalMs).orElse(this.config.consumerGroupHeartbeatIntervalMs());
    }

    private int shareGroupSessionTimeoutMs(String groupId) {
        Optional<GroupConfig> groupConfig = this.groupConfigManager.groupConfig(groupId);
        return groupConfig.map(GroupConfig::shareSessionTimeoutMs).orElse(this.config.shareGroupSessionTimeoutMs());
    }

    private int shareGroupHeartbeatIntervalMs(String groupId) {
        Optional<GroupConfig> groupConfig = this.groupConfigManager.groupConfig(groupId);
        return groupConfig.map(GroupConfig::shareHeartbeatIntervalMs).orElse(this.config.shareGroupHeartbeatIntervalMs());
    }

    private int streamsGroupSessionTimeoutMs(String groupId) {
        Optional<GroupConfig> groupConfig = this.groupConfigManager.groupConfig(groupId);
        return groupConfig.map(GroupConfig::streamsSessionTimeoutMs).orElse(this.config.streamsGroupSessionTimeoutMs());
    }

    private int streamsGroupHeartbeatIntervalMs(String groupId) {
        Optional<GroupConfig> groupConfig = this.groupConfigManager.groupConfig(groupId);
        return groupConfig.map(GroupConfig::streamsHeartbeatIntervalMs).orElse(this.config.streamsGroupHeartbeatIntervalMs());
    }

    private TaskAssignor streamsGroupAssignor(String groupId) {
        return this.streamsGroupAssignors.get("sticky");
    }

    private Map<String, String> streamsGroupAssignmentConfigs(String groupId) {
        Optional<GroupConfig> groupConfig = this.groupConfigManager.groupConfig(groupId);
        Integer numStandbyReplicas = groupConfig.map(GroupConfig::streamsNumStandbyReplicas).orElse(this.config.streamsGroupNumStandbyReplicas());
        return Map.of("num.standby.replicas", numStandbyReplicas.toString());
    }

    static String classicGroupHeartbeatKey(String groupId, String memberId) {
        return "heartbeat-" + groupId + "-" + memberId;
    }

    static String classicGroupJoinKey(String groupId) {
        return "join-" + groupId;
    }

    static String classicGroupSyncKey(String groupId) {
        return "sync-" + groupId;
    }

    static String consumerGroupJoinKey(String groupId, String memberId) {
        return "join-" + groupId + "-" + memberId;
    }

    static String consumerGroupSyncKey(String groupId, String memberId) {
        return "sync-" + groupId + "-" + memberId;
    }

    Map<String, ShareGroup.ShareGroupStatePartitionMetadataInfo> shareGroupStatePartitionMetadata() {
        return this.shareGroupStatePartitionMetadata;
    }

    private static class UpdateSubscriptionMetadataResult {
        private final int groupEpoch;
        private final SubscriptionType subscriptionType;

        UpdateSubscriptionMetadataResult(int groupEpoch, SubscriptionType subscriptionType) {
            this.groupEpoch = groupEpoch;
            this.subscriptionType = Objects.requireNonNull(subscriptionType);
        }
    }

    public static class Builder {
        private LogContext logContext = null;
        private SnapshotRegistry snapshotRegistry = null;
        private Time time = null;
        private CoordinatorTimer<Void, CoordinatorRecord> timer = null;
        private CoordinatorExecutor<CoordinatorRecord> executor = null;
        private GroupCoordinatorConfig config = null;
        private GroupConfigManager groupConfigManager = null;
        private MetadataImage metadataImage = null;
        private ShareGroupPartitionAssignor shareGroupAssignor = null;
        private GroupCoordinatorMetricsShard metrics;
        private Optional<Plugin<Authorizer>> authorizerPlugin = null;
        private List<TaskAssignor> streamsGroupAssignors = null;

        Builder withLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        Builder withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
            this.snapshotRegistry = snapshotRegistry;
            return this;
        }

        Builder withTime(Time time) {
            this.time = time;
            return this;
        }

        Builder withTimer(CoordinatorTimer<Void, CoordinatorRecord> timer) {
            this.timer = timer;
            return this;
        }

        Builder withExecutor(CoordinatorExecutor<CoordinatorRecord> executor) {
            this.executor = executor;
            return this;
        }

        Builder withConfig(GroupCoordinatorConfig config) {
            this.config = config;
            return this;
        }

        Builder withGroupConfigManager(GroupConfigManager groupConfigManager) {
            this.groupConfigManager = groupConfigManager;
            return this;
        }

        Builder withStreamsGroupAssignors(List<TaskAssignor> streamsGroupAssignors) {
            this.streamsGroupAssignors = streamsGroupAssignors;
            return this;
        }

        Builder withMetadataImage(MetadataImage metadataImage) {
            this.metadataImage = metadataImage;
            return this;
        }

        Builder withGroupCoordinatorMetricsShard(GroupCoordinatorMetricsShard metrics) {
            this.metrics = metrics;
            return this;
        }

        Builder withShareGroupAssignor(ShareGroupPartitionAssignor shareGroupAssignor) {
            this.shareGroupAssignor = shareGroupAssignor;
            return this;
        }

        Builder withAuthorizerPlugin(Optional<Plugin<Authorizer>> authorizerPlugin) {
            this.authorizerPlugin = authorizerPlugin;
            return this;
        }

        GroupMetadataManager build() {
            if (this.logContext == null) {
                this.logContext = new LogContext();
            }
            if (this.snapshotRegistry == null) {
                this.snapshotRegistry = new SnapshotRegistry(this.logContext);
            }
            if (this.metadataImage == null) {
                this.metadataImage = MetadataImage.EMPTY;
            }
            if (this.time == null) {
                this.time = Time.SYSTEM;
            }
            if (this.authorizerPlugin == null) {
                this.authorizerPlugin = Optional.empty();
            }
            if (this.timer == null) {
                throw new IllegalArgumentException("Timer must be set.");
            }
            if (this.executor == null) {
                throw new IllegalArgumentException("Executor must be set.");
            }
            if (this.config == null) {
                throw new IllegalArgumentException("Config must be set.");
            }
            if (this.shareGroupAssignor == null) {
                this.shareGroupAssignor = new SimpleAssignor();
            }
            if (this.metrics == null) {
                throw new IllegalArgumentException("GroupCoordinatorMetricsShard must be set.");
            }
            if (this.groupConfigManager == null) {
                throw new IllegalArgumentException("GroupConfigManager must be set.");
            }
            if (this.streamsGroupAssignors == null) {
                this.streamsGroupAssignors = List.of(new StickyTaskAssignor());
            }
            return new GroupMetadataManager(this.snapshotRegistry, this.logContext, this.time, this.timer, this.executor, this.metrics, this.metadataImage, this.config, this.groupConfigManager, this.shareGroupAssignor, this.authorizerPlugin, this.streamsGroupAssignors);
        }
    }
}

