From 11ab2d793b1402e9e69eced68eb9d213de162db0 Mon Sep 17 00:00:00 2001 From: Trygve Aasjord Date: Thu, 14 May 2026 08:43:48 +0200 Subject: [PATCH] Add configuration property for leaveGroupOnClose in Kafka Streams This makes it possible to set the option to leave the group on close at configuration/deployment time in the application properties. Signed-off-by: Trygve Aasjord --- .../boot/kafka/autoconfigure/KafkaProperties.java | 13 +++++++++++++ .../KafkaStreamsAnnotationDrivenConfiguration.java | 1 + 2 files changed, 14 insertions(+) diff --git a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaProperties.java b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaProperties.java index 172ffd707808..f9bbf7c778c5 100644 --- a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaProperties.java +++ b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaProperties.java @@ -812,6 +812,11 @@ public static class Streams { */ private @Nullable String stateDir; + /** + * Whether the consumer should leave the group when stopping Kafka Streams. + */ + private boolean leaveGroupOnClose; + /** * Additional Kafka properties used to configure the streams. */ @@ -885,6 +890,14 @@ public void setStateDir(@Nullable String stateDir) { this.stateDir = stateDir; } + public boolean isLeaveGroupOnClose() { + return this.leaveGroupOnClose; + } + + public void setLeaveGroupOnClose(boolean leaveGroupOnClose) { + this.leaveGroupOnClose = leaveGroupOnClose; + } + public Map getProperties() { return this.properties; } diff --git a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaStreamsAnnotationDrivenConfiguration.java b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaStreamsAnnotationDrivenConfiguration.java index 1c7184960063..410d72067254 100644 --- a/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaStreamsAnnotationDrivenConfiguration.java +++ b/module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaStreamsAnnotationDrivenConfiguration.java @@ -101,6 +101,7 @@ public void configure(StreamsBuilderFactoryBean factoryBean) { KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup(); CleanupConfig cleanupConfig = new CleanupConfig(cleanup.isOnStartup(), cleanup.isOnShutdown()); factoryBean.setCleanupConfig(cleanupConfig); + factoryBean.setLeaveGroupOnClose(this.properties.getStreams().isLeaveGroupOnClose()); } @Override