Kafka Integration Testing with Testcontainers

@EmbeddedKafka has long been the standard for Kafka testing in Spring Boot. It works, but it runs a different Kafka implementation than production — one that can accept messages and serialize records in ways that a real Kafka broker would not. KafkaContainer runs the same Confluent Kafka image your production environment uses, with the same broker configuration, same partition behavior, and same consumer group offset management. This article shows how to test Kafka producers, consumers, and dead-letter topics with a real broker.


What You’ll Learn

  • KafkaContainer setup and @ServiceConnection
  • Testing @KafkaListener consumers with Awaitility
  • Testing producers with KafkaTemplate
  • Testing consumer group offset behavior
  • Testing dead-letter topic routing
  • @EmbeddedKafka vs KafkaContainer — when to use each

Dependencies

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-testcontainers</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <scope>test</scope>
</dependency>
<!-- Awaitility for async assertions -->
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <scope>test</scope>
</dependency>

The Domain Model — Order Events

The e-commerce order system publishes events when order status changes:

public record OrderEvent(
    String orderId,
    String customerId,
    OrderStatus status,
    BigDecimal totalAmount,
    LocalDateTime occurredAt
) {}

The producer publishes events to Kafka:

@Service
public class OrderEventPublisher {

    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
    private static final String TOPIC = "order-events";

    public OrderEventPublisher(KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void publishOrderCreated(Order order) {
        OrderEvent event = new OrderEvent(
            order.getId().toString(),
            order.getCustomerId(),
            order.getStatus(),
            order.getTotalAmount(),
            LocalDateTime.now()
        );
        kafkaTemplate.send(TOPIC, order.getId().toString(), event);
    }
}

The consumer processes events:

@Service
public class OrderEventConsumer {

    private final List<OrderEvent> processedEvents = new CopyOnWriteArrayList<>();

    @KafkaListener(topics = "order-events", groupId = "order-processor")
    public void handleOrderEvent(OrderEvent event) {
        processedEvents.add(event);
        // business logic here
    }

    public List<OrderEvent> getProcessedEvents() {
        return Collections.unmodifiableList(processedEvents);
    }
}

Basic KafkaContainer Setup

@SpringBootTest
@Testcontainers
class OrderEventPublisherTest {

    @Container
    @ServiceConnection
    static KafkaContainer kafka =
        new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.1"));

    @Autowired
    private OrderEventPublisher publisher;

    @Autowired
    private OrderEventConsumer consumer;
}

@ServiceConnection on KafkaContainer configures spring.kafka.bootstrap-servers automatically.


Testing Producer and Consumer Together

Kafka is asynchronous. The producer publishes a message; the consumer processes it on a different thread at some point in the near future. Your test needs to wait for the consumer to finish processing before asserting.

Awaitility is the standard solution for asserting async behavior:

@Test
void shouldPublishOrderCreatedEventAndConsumeIt() {
    Order order = new Order("customer-1", OrderStatus.PENDING, BigDecimal.valueOf(99.99));
    order.setId(1L);

    publisher.publishOrderCreated(order);

    await()
        .atMost(Duration.ofSeconds(10))
        .pollInterval(Duration.ofMillis(200))
        .until(() -> consumer.getProcessedEvents().stream()
            .anyMatch(e -> e.orderId().equals("1"))
        );

    List<OrderEvent> events = consumer.getProcessedEvents();
    assertThat(events).hasSize(1);
    assertThat(events.get(0).orderId()).isEqualTo("1");
    assertThat(events.get(0).status()).isEqualTo(OrderStatus.PENDING);
}

await().atMost(10 seconds) retries the condition check every 200ms until it becomes true or the 10-second timeout expires. This is far more reliable than Thread.sleep().


Testing with KafkaTestUtils

Spring Kafka provides KafkaTestUtils for creating test consumers that read messages directly from topics:

@Test
void shouldPublishCorrectMessageToTopic() throws InterruptedException {
    Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
        kafka.getBootstrapServers(),
        "test-consumer-group",
        "true"
    );

    try (Consumer<String, String> testConsumer =
             new KafkaConsumer<>(consumerProps, new StringDeserializer(), new StringDeserializer())) {

        testConsumer.subscribe(Collections.singletonList("order-events"));

        Order order = new Order("customer-1", OrderStatus.PENDING, BigDecimal.valueOf(99.99));
        order.setId(42L);
        publisher.publishOrderCreated(order);

        ConsumerRecords<String, String> records =
            KafkaTestUtils.getRecords(testConsumer, Duration.ofSeconds(10));

        assertThat(records.count()).isEqualTo(1);

        ConsumerRecord<String, String> record = records.iterator().next();
        assertThat(record.key()).isEqualTo("42");
        assertThat(record.value()).contains("customer-1");
    }
}

Testing Consumer Group Offset Management

@Test
void shouldResumeConsumptionFromLastCommittedOffset() throws Exception {
    // Publish 5 messages
    for (int i = 1; i <= 5; i++) {
        Order order = new Order("customer-" + i, OrderStatus.PENDING, BigDecimal.valueOf(i * 10.0));
        order.setId((long) i);
        publisher.publishOrderCreated(order);
    }

    // Wait for all 5 to be consumed
    await()
        .atMost(Duration.ofSeconds(15))
        .until(() -> consumer.getProcessedEvents().size() >= 5);

    assertThat(consumer.getProcessedEvents()).hasSize(5);

    // Publish 3 more messages after initial consumption
    for (int i = 6; i <= 8; i++) {
        Order order = new Order("customer-" + i, OrderStatus.CONFIRMED, BigDecimal.valueOf(i * 10.0));
        order.setId((long) i);
        publisher.publishOrderCreated(order);
    }

    // Consumer should pick up exactly the 3 new messages
    await()
        .atMost(Duration.ofSeconds(15))
        .until(() -> consumer.getProcessedEvents().size() >= 8);

    assertThat(consumer.getProcessedEvents()).hasSize(8);
}

Testing Dead-Letter Topics

When a consumer fails to process a message (after retries), Spring Kafka routes it to a dead-letter topic. Test that this routing works correctly:

@Service
public class OrderEventConsumerWithDLT {

    private final List<OrderEvent> processedEvents = new CopyOnWriteArrayList<>();
    private final List<OrderEvent> deadLetterEvents = new CopyOnWriteArrayList<>();

    @KafkaListener(topics = "order-events", groupId = "order-processor-dlt")
    public void handleOrderEvent(OrderEvent event) {
        if (event.orderId().startsWith("FAIL-")) {
            throw new RuntimeException("Simulated processing failure for order: " + event.orderId());
        }
        processedEvents.add(event);
    }

    @KafkaListener(topics = "order-events.DLT", groupId = "order-processor-dlt-handler")
    public void handleDeadLetter(OrderEvent event) {
        deadLetterEvents.add(event);
    }

    public List<OrderEvent> getProcessedEvents() { return processedEvents; }
    public List<OrderEvent> getDeadLetterEvents() { return deadLetterEvents; }
}

Spring Kafka retry + DLT configuration:

@Configuration
public class KafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, OrderEvent>
    kafkaListenerContainerFactory(
        ConsumerFactory<String, OrderEvent> consumerFactory,
        KafkaTemplate<String, OrderEvent> kafkaTemplate
    ) {
        ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);

        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate),
            new FixedBackOff(1000L, 2)  // retry twice with 1s delay
        );
        factory.setCommonErrorHandler(errorHandler);

        return factory;
    }
}
@Test
void shouldRouteFailedMessagesToDeadLetterTopic() {
    // Publish a "good" message
    OrderEvent goodEvent = new OrderEvent("ORDER-1", "customer-1", OrderStatus.PENDING,
        BigDecimal.valueOf(99.99), LocalDateTime.now());
    kafkaTemplate.send("order-events", "ORDER-1", goodEvent);

    // Publish a "bad" message that will fail processing
    OrderEvent badEvent = new OrderEvent("FAIL-ORDER-2", "customer-2", OrderStatus.PENDING,
        BigDecimal.valueOf(49.99), LocalDateTime.now());
    kafkaTemplate.send("order-events", "FAIL-ORDER-2", badEvent);

    // Good message should be processed successfully
    await()
        .atMost(Duration.ofSeconds(15))
        .until(() -> dltConsumer.getProcessedEvents().size() >= 1);

    assertThat(dltConsumer.getProcessedEvents())
        .extracting(OrderEvent::orderId)
        .contains("ORDER-1")
        .doesNotContain("FAIL-ORDER-2");

    // Failed message should end up in DLT (after retries)
    await()
        .atMost(Duration.ofSeconds(30))  // allow time for retries + DLT routing
        .until(() -> dltConsumer.getDeadLetterEvents().size() >= 1);

    assertThat(dltConsumer.getDeadLetterEvents())
        .extracting(OrderEvent::orderId)
        .contains("FAIL-ORDER-2");
}

@EmbeddedKafka vs KafkaContainer

@EmbeddedKafkaKafkaContainer
SpeedFaster (in-process)Slower (Docker startup)
ImplementationApache Kafka (scala in-process)Confluent Kafka (Docker)
Broker parityPartial — some behaviors differFull — same binary as production
Consumer groupsMostly worksFull fidelity
Exactly-once semanticsLimitedFull support
Streams API testingLimitedFull support
Connector testingNot supportedSupported
Use caseFast unit-like testsIntegration tests with production fidelity

Use @EmbeddedKafka for fast tests that need Kafka but do not rely on broker-specific behavior. Use KafkaContainer when production parity matters — DLT behavior, consumer group rebalancing, exactly-once semantics, or Kafka Streams topology testing.


Common Pitfalls

Not using Awaitility. Thread.sleep() is unreliable for async tests. Use await().atMost() with a condition check.

Consumer group ID conflicts between tests. If two tests use the same consumer group ID, one test’s consumer may read messages intended for the other test’s consumer. Use unique consumer group IDs per test or clear topic offsets between tests.

Insufficient timeout. Kafka consumer group rebalancing after a new consumer joins the group takes 3–10 seconds. Tests that assert immediately after starting a consumer often fail because the consumer has not yet been assigned a partition. Set generous timeouts (15–30 seconds) for consumer tests.


Summary

KafkaContainer gives you a real Kafka broker with full production fidelity for integration tests. Awaitility handles async message consumption assertions. Dead-letter topic testing verifies retry and routing behavior. For tests that need production-accurate consumer group management and exactly-once semantics, KafkaContainer is the right tool.

The next article covers RabbitMQ testing — exchanges, queues, routing keys, and Spring AMQP listener testing.

Next: RabbitMQ Testing with Testcontainers