Kafka Integration Testing with Testcontainers
@EmbeddedKafka has long been the standard for Kafka testing in Spring Boot. It works, but it runs a different Kafka implementation than production — one that can accept messages and serialize records in ways that a real Kafka broker would not. KafkaContainer runs the same Confluent Kafka image your production environment uses, with the same broker configuration, same partition behavior, and same consumer group offset management. This article shows how to test Kafka producers, consumers, and dead-letter topics with a real broker.
What You’ll Learn
KafkaContainersetup and@ServiceConnection- Testing
@KafkaListenerconsumers with Awaitility - Testing producers with
KafkaTemplate - Testing consumer group offset behavior
- Testing dead-letter topic routing
@EmbeddedKafkavsKafkaContainer— when to use each
Dependencies
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<!-- Awaitility for async assertions -->
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
The Domain Model — Order Events
The e-commerce order system publishes events when order status changes:
public record OrderEvent(
String orderId,
String customerId,
OrderStatus status,
BigDecimal totalAmount,
LocalDateTime occurredAt
) {}
The producer publishes events to Kafka:
@Service
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
private static final String TOPIC = "order-events";
public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void publishOrderCreated(Order order) {
OrderEvent event = new OrderEvent(
order.getId().toString(),
order.getCustomerId(),
order.getStatus(),
order.getTotalAmount(),
LocalDateTime.now()
);
kafkaTemplate.send(TOPIC, order.getId().toString(), event);
}
}
The consumer processes events:
@Service
public class OrderEventConsumer {
private final List<OrderEvent> processedEvents = new CopyOnWriteArrayList<>();
@KafkaListener(topics = "order-events", groupId = "order-processor")
public void handleOrderEvent(OrderEvent event) {
processedEvents.add(event);
// business logic here
}
public List<OrderEvent> getProcessedEvents() {
return Collections.unmodifiableList(processedEvents);
}
}
Basic KafkaContainer Setup
@SpringBootTest
@Testcontainers
class OrderEventPublisherTest {
@Container
@ServiceConnection
static KafkaContainer kafka =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.1"));
@Autowired
private OrderEventPublisher publisher;
@Autowired
private OrderEventConsumer consumer;
}
@ServiceConnection on KafkaContainer configures spring.kafka.bootstrap-servers automatically.
Testing Producer and Consumer Together
Kafka is asynchronous. The producer publishes a message; the consumer processes it on a different thread at some point in the near future. Your test needs to wait for the consumer to finish processing before asserting.
Awaitility is the standard solution for asserting async behavior:
@Test
void shouldPublishOrderCreatedEventAndConsumeIt() {
Order order = new Order("customer-1", OrderStatus.PENDING, BigDecimal.valueOf(99.99));
order.setId(1L);
publisher.publishOrderCreated(order);
await()
.atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofMillis(200))
.until(() -> consumer.getProcessedEvents().stream()
.anyMatch(e -> e.orderId().equals("1"))
);
List<OrderEvent> events = consumer.getProcessedEvents();
assertThat(events).hasSize(1);
assertThat(events.get(0).orderId()).isEqualTo("1");
assertThat(events.get(0).status()).isEqualTo(OrderStatus.PENDING);
}
await().atMost(10 seconds) retries the condition check every 200ms until it becomes true or the 10-second timeout expires. This is far more reliable than Thread.sleep().
Testing with KafkaTestUtils
Spring Kafka provides KafkaTestUtils for creating test consumers that read messages directly from topics:
@Test
void shouldPublishCorrectMessageToTopic() throws InterruptedException {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
kafka.getBootstrapServers(),
"test-consumer-group",
"true"
);
try (Consumer<String, String> testConsumer =
new KafkaConsumer<>(consumerProps, new StringDeserializer(), new StringDeserializer())) {
testConsumer.subscribe(Collections.singletonList("order-events"));
Order order = new Order("customer-1", OrderStatus.PENDING, BigDecimal.valueOf(99.99));
order.setId(42L);
publisher.publishOrderCreated(order);
ConsumerRecords<String, String> records =
KafkaTestUtils.getRecords(testConsumer, Duration.ofSeconds(10));
assertThat(records.count()).isEqualTo(1);
ConsumerRecord<String, String> record = records.iterator().next();
assertThat(record.key()).isEqualTo("42");
assertThat(record.value()).contains("customer-1");
}
}
Testing Consumer Group Offset Management
@Test
void shouldResumeConsumptionFromLastCommittedOffset() throws Exception {
// Publish 5 messages
for (int i = 1; i <= 5; i++) {
Order order = new Order("customer-" + i, OrderStatus.PENDING, BigDecimal.valueOf(i * 10.0));
order.setId((long) i);
publisher.publishOrderCreated(order);
}
// Wait for all 5 to be consumed
await()
.atMost(Duration.ofSeconds(15))
.until(() -> consumer.getProcessedEvents().size() >= 5);
assertThat(consumer.getProcessedEvents()).hasSize(5);
// Publish 3 more messages after initial consumption
for (int i = 6; i <= 8; i++) {
Order order = new Order("customer-" + i, OrderStatus.CONFIRMED, BigDecimal.valueOf(i * 10.0));
order.setId((long) i);
publisher.publishOrderCreated(order);
}
// Consumer should pick up exactly the 3 new messages
await()
.atMost(Duration.ofSeconds(15))
.until(() -> consumer.getProcessedEvents().size() >= 8);
assertThat(consumer.getProcessedEvents()).hasSize(8);
}
Testing Dead-Letter Topics
When a consumer fails to process a message (after retries), Spring Kafka routes it to a dead-letter topic. Test that this routing works correctly:
@Service
public class OrderEventConsumerWithDLT {
private final List<OrderEvent> processedEvents = new CopyOnWriteArrayList<>();
private final List<OrderEvent> deadLetterEvents = new CopyOnWriteArrayList<>();
@KafkaListener(topics = "order-events", groupId = "order-processor-dlt")
public void handleOrderEvent(OrderEvent event) {
if (event.orderId().startsWith("FAIL-")) {
throw new RuntimeException("Simulated processing failure for order: " + event.orderId());
}
processedEvents.add(event);
}
@KafkaListener(topics = "order-events.DLT", groupId = "order-processor-dlt-handler")
public void handleDeadLetter(OrderEvent event) {
deadLetterEvents.add(event);
}
public List<OrderEvent> getProcessedEvents() { return processedEvents; }
public List<OrderEvent> getDeadLetterEvents() { return deadLetterEvents; }
}
Spring Kafka retry + DLT configuration:
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
kafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory,
KafkaTemplate<String, OrderEvent> kafkaTemplate
) {
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate),
new FixedBackOff(1000L, 2) // retry twice with 1s delay
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}
@Test
void shouldRouteFailedMessagesToDeadLetterTopic() {
// Publish a "good" message
OrderEvent goodEvent = new OrderEvent("ORDER-1", "customer-1", OrderStatus.PENDING,
BigDecimal.valueOf(99.99), LocalDateTime.now());
kafkaTemplate.send("order-events", "ORDER-1", goodEvent);
// Publish a "bad" message that will fail processing
OrderEvent badEvent = new OrderEvent("FAIL-ORDER-2", "customer-2", OrderStatus.PENDING,
BigDecimal.valueOf(49.99), LocalDateTime.now());
kafkaTemplate.send("order-events", "FAIL-ORDER-2", badEvent);
// Good message should be processed successfully
await()
.atMost(Duration.ofSeconds(15))
.until(() -> dltConsumer.getProcessedEvents().size() >= 1);
assertThat(dltConsumer.getProcessedEvents())
.extracting(OrderEvent::orderId)
.contains("ORDER-1")
.doesNotContain("FAIL-ORDER-2");
// Failed message should end up in DLT (after retries)
await()
.atMost(Duration.ofSeconds(30)) // allow time for retries + DLT routing
.until(() -> dltConsumer.getDeadLetterEvents().size() >= 1);
assertThat(dltConsumer.getDeadLetterEvents())
.extracting(OrderEvent::orderId)
.contains("FAIL-ORDER-2");
}
@EmbeddedKafka vs KafkaContainer
@EmbeddedKafka | KafkaContainer | |
|---|---|---|
| Speed | Faster (in-process) | Slower (Docker startup) |
| Implementation | Apache Kafka (scala in-process) | Confluent Kafka (Docker) |
| Broker parity | Partial — some behaviors differ | Full — same binary as production |
| Consumer groups | Mostly works | Full fidelity |
| Exactly-once semantics | Limited | Full support |
| Streams API testing | Limited | Full support |
| Connector testing | Not supported | Supported |
| Use case | Fast unit-like tests | Integration tests with production fidelity |
Use @EmbeddedKafka for fast tests that need Kafka but do not rely on broker-specific behavior. Use KafkaContainer when production parity matters — DLT behavior, consumer group rebalancing, exactly-once semantics, or Kafka Streams topology testing.
Common Pitfalls
Not using Awaitility. Thread.sleep() is unreliable for async tests. Use await().atMost() with a condition check.
Consumer group ID conflicts between tests. If two tests use the same consumer group ID, one test’s consumer may read messages intended for the other test’s consumer. Use unique consumer group IDs per test or clear topic offsets between tests.
Insufficient timeout. Kafka consumer group rebalancing after a new consumer joins the group takes 3–10 seconds. Tests that assert immediately after starting a consumer often fail because the consumer has not yet been assigned a partition. Set generous timeouts (15–30 seconds) for consumer tests.
Summary
KafkaContainer gives you a real Kafka broker with full production fidelity for integration tests. Awaitility handles async message consumption assertions. Dead-letter topic testing verifies retry and routing behavior. For tests that need production-accurate consumer group management and exactly-once semantics, KafkaContainer is the right tool.
The next article covers RabbitMQ testing — exchanges, queues, routing keys, and Spring AMQP listener testing.