1. Introduction

Kafka for JUnit enables developers to start and stop a complete Kafka cluster comprised of Kafka brokers and distributed Kafka Connect workers from within a JUnit test. It also provides a rich set of convenient accessors to interact with such an embedded Kafka cluster in a lean and non-obtrusive way.

Kafka for JUnit can be used to both whitebox-test individual Kafka-based components of your application or to blackbox-test applications that offer an incoming and/or outgoing Kafka-based interface.

2. Using Kafka for JUnit in your tests

Kafka for JUnit provides the necessary infrastructure to exercise your Kafka-based components against an embeddable Kafka cluster (cf. Working with an embedded Kafka cluster). However, Kafka for JUnit got you covered as well if you are simply interested in using the convenient accessors against Kafka clusters that are already present in your infrastructure (cf. section Working with an external Kafka cluster).

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;

class KafkaTest {

    private EmbeddedKafkaCluster kafka;

    @BeforeEach
    void setupKafka() {
        kafka = provisionWith(defaultClusterConfig());
        kafka.start();
    }

    @AfterEach
    void tearDownKafka() {
        kafka.stop();
    }

    @Test
    void shouldWaitForRecordsToBePublished() throws Exception {
        kafka.send(to("test-topic", "a", "b", "c"));
        kafka.observe(on("test-topic", 3));
    }
}

This starts an embedded Kafka cluster and submits three records to the topic named test-topic. The call to kafka.observe(on("test-topic", 3)) watches that same topic for a configurable amount of time and checks if it observes the previously submitted records. If it doesn’t, Kafka for JUnit raises an AssertionError which would fail the test. Surely, Kafka for JUnit provides lots of more ways to interact with a Kafka cluster.

Since EmbeddedKafkaCluster implements the AutoCloseable interface, you can achieve the same behavior using a try-with-resources-construct.

import org.junit.jupiter.api.Test;

import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;

class KafkaTest {

  @Test
  void shouldWaitForRecordsToBePublished() throws Exception {

    try (EmbeddedKafkaCluster kafka = provisionWith(defaultClusterConfig())) {
      kafka.start();
      kafka.send(to("test-topic", "a", "b", "c"));
      kafka.observe(on("test-topic", 3));
    }
  }
}

2.1. Supported versions of Apache Kafka

Version of Kafka for JUnit Supports

0.1.x

Apache Kafka 1.0.0

0.2.x

Apache Kafka 1.0.0

0.3.x

Apache Kafka 1.0.0

1.0.x

Apache Kafka 1.1.1

2.0.x

Apache Kafka 2.0.0

2.1.x

Apache Kafka 2.1.1

2.2.x

Apache Kafka 2.2.1

2.3.x

Apache Kafka 2.3.0

2.4.x

Apache Kafka 2.4.0

2.5.x

Apache Kafka 2.5.1

2.6.x

Apache Kafka 2.6.0

2.7.x

Apache Kafka 2.7.0

3. Working with an embedded Kafka cluster

Kafka for JUnit is able to spin up a fully-fledged embedded Kafka cluster that is accessible via class EmbeddedKafkaCluster. EmbeddedKafkaCluster implements the interfaces RecordProducer, RecordConsumer and TopicManager and thus provides convenient accessors to interact with the cluster.

Using EmbeddedKafkaCluster in a JUnit test is quite simple. The necessary code to set it up is minimal if you are comfortable with the default configuration.

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;

class KafkaTest {

    private EmbeddedKafkaCluster kafka;

    @BeforeEach
    void setupKafka() {
        kafka = provisionWith(defaultClusterConfig());
        kafka.start();
    }

    @AfterEach
    void tearDownKafka() {
        kafka.stop();
    }
}

Kafka for JUnit uses the Builder pattern extensively to provide a fluent API when provisioning an embedded Kafka cluster. Let’s take a closer look at method EmbeddedKafkaCluster.provisionWith. This method consumes a configuration of type EmbeddedKafkaClusterConfig. EmbeddedKafkaClusterConfig uses defaults for the Kafka broker and ZooKeeper. By default, Kafka Connect will not be provisioned at all. The builder of EmbeddedKafkaClusterConfig provides a provisionWith method as well and is overloaded to accept configurations of type EmbeddedZooKeeperConfig, EmbeddedKafkaConfig and EmbeddedConnectConfig. The following listing demonstrates how to adjust the configuration of the embedded Kafka broker wrt. the default number of partitions for newly created topics.

EmbeddedKafkaCluster kafka = provisionWith(newClusterConfig()
    .configure(kafkaConnect()
    .with(KafkaConfig$.MODULE$.NumPartitionsProp(), "5")));

The builders for those configurations provide a uniform interface for overriding defaults, comprising two methods with(String propertyName, T value) and withAll(java.util.Properties overrides). To override a default value, you simply provide the name of the configuration parameter as defined by the resp. Kafka component along with the new value.

Using the default setting will provide you with a single embedded Kafka broker. This ought to be sufficient for most cases. However, there are scenarios which require testing against multiple brokers that form a cluster. Forming an embedded cluster with multiple brokers is done by adjusting the default provisioning of your test case. See the listing underneath for an example.

EmbeddedKafkaCluster kafka = provisionWith(newClusterConfig()
    .configure(brokers()
        .withNumberOfBrokers(3)
        .with(KafkaConfig$.MODULE$.NumPartitionsProp(), "5")
        .with(KafkaConfig$.MODULE$.DefaultReplicationFactorProp(), "3")
        .with(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), "2")
        .with(KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), "3")
        .with(KafkaConfig$.MODULE$.TransactionsTopicReplicationFactorProp(), "3")
        .with(KafkaConfig$.MODULE$.TransactionsTopicMinISRProp(), "2")));

Using this configuration, we end up with a total of three brokers that form an embedded Kafka cluster, while the defaults for topic partitions and replicas have been adjusted to be consistent with the size of the cluster.

Of course, you can also use the try-with-resources-pattern to fire up an embedded cluster. Have a look at the following test setup.

import org.junit.jupiter.api.Test;

import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;

class KafkaTest {

  @Test
  void shouldWaitForRecordsToBePublished() throws Exception {

    try (EmbeddedKafkaCluster kafka = provisionWith(defaultClusterConfig())) {
      kafka.start();
      kafka.send(to("test-topic", "a", "b", "c"));
      kafka.observe(on("test-topic", 3));
    }
  }
}

See sections on Producing records, Consuming records and Managing topics for further reference on how to interact with the cluster.

3.1. Failure Modes

EmbeddedKafkaCluster provides the means to disconnect - and re-connect of course - specific embedded Kafka brokers. All brokers in the embedded cluster get broker ID assigned during cluster formation. This broker ID is an Integer-based value and starts at 1. The broker ID increases by 1 for every subsequent embedded Kafka broker that is started during cluster formation.

Clusters stay fixed wrt. the maximum number of embedded brokers. But individual brokers can, given their broker ID, be disconnected from the rest of the cluster to test for failure scenarios. Such failure scenarios include:

  • How does may Kafka-based component behave in the presence of broker outages?

  • What happens if the In-Sync-Replica Set (ISR) of a topic that my application consumes from shrinks below its minimum size?

  • Is my application able to progress after brokers re-connect and form a working cluster?

3.1.1. Disconnect and reconnect a single broker

The following listing shows how to disconnect and re-connect a certain broker, while fetching the ISR of a dedicated topic in between these operations to determine whether the cluster behaves correctly.

If you do use this feature of Kafka for JUnit, then please give the embedded cluster some time to handle broker churn. Identifying that a leader for a topic-partition is not available and conducting the leader election takes some time. In the example underneath we introduce a delay of five seconds in between operations that affect cluster membership.
kafka.createTopic(TopicConfig.withName("test-topic")
    .withNumberOfPartitions(5)
    .withNumberOfReplicas(3));

delay(5);

Set<Integer> leaders = kafka.fetchLeaderAndIsr("test-topic")
    .values()
    .stream()
    .map(LeaderAndIsr::getLeader)
    .collect(Collectors.toSet());

assertThat(leaders.contains(1)).isTrue();
assertThat(leaders.contains(2)).isTrue();
assertThat(leaders.contains(3)).isTrue();

kafka.disconnect(1);

delay(5);

Set<Integer> leadersAfterDisconnect = kafka.fetchLeaderAndIsr("test-topic")
    .values()
    .stream()
    .map(LeaderAndIsr::getLeader)
    .collect(Collectors.toSet());

assertThat(leadersAfterDisconnect.contains(1)).isFalse();
assertThat(leadersAfterDisconnect.contains(2)).isTrue();
assertThat(leadersAfterDisconnect.contains(3)).isTrue();

kafka.connect(1);

delay(5);

Set<Integer> leadersAfterReconnect = kafka.fetchLeaderAndIsr("test-topic")
    .values()
    .stream()
    .map(LeaderAndIsr::getLeader)
    .collect(Collectors.toSet());

assertThat(leadersAfterReconnect.contains(1)).isTrue();
assertThat(leadersAfterReconnect.contains(2)).isTrue();
assertThat(leadersAfterReconnect.contains(3)).isTrue();

3.1.2. Disconnect until In-Sync-Replica Set falls below minimum size

The following listing shows how to disconnect the In-Sync-Replica Set (ISR) for a given topic until its ISR falls below its minimum size.

If you do use this feature of Kafka for JUnit, then please give the embedded cluster some time to handle broker churn. Identifying that a leader for a topic-partition is not available and conducting the leader election takes some time. In the example underneath we introduce a delay of five seconds in between operations that affect cluster membership.
// Create a topic and configure the number of replicas as well as the size of the ISR

kafka.createTopic(TopicConfig.withName("test-topic")
    .withNumberOfPartitions(5)
    .withNumberOfReplicas(3)
    .with("min.insync.replicas", "2"));

// Wait a bit to give the cluster a chance to properly assign topic-partitions to leaders

delay(5);

// Disconnect until the remaining number of brokers fall below the minimum ISR size

kafka.disconnectUntilIsrFallsBelowMinimumSize("test-topic");

delay(5);

// Submitting records to this topic will yield a NotEnoughReplicasException

kafka.send(SendValues.to("test-topic", "A"));

The last line of the listing shows the effect of an ISR that can no longer operate reliably. Your Kafka-based component or application would run concurrently to this test so that you are able to observe if it behaves correctly (e.g. by checking that the component progresses normally if the ISR is restored).

3.1.3. Restoring the In-Sync-Replica Set

Restoring the In-Sync-Replica Set is easy, as method disconnectUntilIsrFallsBelowMinimumSize returns a list of broker IDs for all brokers that have been deactivated during the shrinking. The following listing shows how to restore the ISR.

kafka.createTopic(TopicConfig.withName("test-topic")
    .withNumberOfPartitions(5)
    .withNumberOfReplicas(3)
    .with("min.insync.replicas", "2"));

delay(5);

Set<Integer> disconnectedBrokers = kafka.disconnectUntilIsrFallsBelowMinimumSize("test-topic");

delay(5);

// Do some testing, trigger some operations, observe the behavior of your application

kafka.connect(disconnectedBrokers);

// Give the cluster some time to assign leaders and reestablish the ISR

delay(5);

// Do some more testing ...

4. Working with an external Kafka cluster

Kafka for JUnit can be used to work with an external Kafka cluster. This is useful if you want to execute your tests against a staging/testing environment or if you already use other testing libraries (e.g. Testcontainers) that spin up a Kafka cluster on your local machine, but want to use the convenient accessors provided by Kafka for JUnit.

Class ExternalKafkaCluster integrates an external cluster. Just like EmbeddableKafkaCluster, an ExternalKafkaCluster also implements the interfaces RecordProducer, RecordConsumer and TopicManager and thus provides convenient accessors to interact with the cluster.

Using ExternalKafkaCluster in a JUnit test is easy. The listing below shows the necessary code to use ExternalKafkaCluster in combination with Testcontainers.

@Testcontainers
class ExternalKafkaClusterTest {

    // This is not part of Kafka for JUnit, but a sub-module provided
    // by Testcontainers (org.testcontainers:kafka)
    @Container
    private KafkaContainer kafkaContainer = new KafkaContainer();

    @Test
    @DisplayName("should be able to observe records written to an external Kafka cluster")
    void externalKafkaClusterShouldWorkWithExternalResources() throws Exception {

        ExternalKafkaCluster kafka = ExternalKafkaCluster.at(kafkaContainer.getBootstrapServers());

        // use the accessors that cluster provides to interact with the Kafka container

        [...]
    }
}

See sections on Producing records, Consuming records and Managing topics for further reference on how to interact with the cluster.

5. Producing records

Class EmbeddedKafkaClusterRule as well as EmbeddedKafkaCluster expose convenience methods for producing new Kafka records. Have a look at the RecordProducer interface (Javadoc omitted for brevity).

public interface RecordProducer {

    <V> List<RecordMetadata> send(SendValues<V> sendRequest) throws InterruptedException;
    <V> List<RecordMetadata> send(SendValuesTransactional<V> sendRequest) throws InterruptedException;
    <K, V> List<RecordMetadata> send(SendKeyValues<K, V> sendRequest) throws InterruptedException;
    <K, V> List<RecordMetadata> send(SendKeyValuesTransactional<K, V> sendRequest) throws InterruptedException;
}

Calling send using an instance of SendValues does just that: It produces non-keyed Kafka records that only feature a value. The key of a record that has been produced this way is simply null. If you wish to associate a key, you can do so by passing an instance of SendKeyValues to the send method. Both SendValues and SendKeyValues use the Builder pattern so that creating the resp. send parameterization is easy and does not pollute your test code with any kind of boilerplate.

Implementations of the RecordProducer interface use the high-level producer API that comes with Apache Kafka. Hence, the underlying producer is a KafkaProducer. This KafkaProducer is fully parameterizable via the builders of both SendValues and SendKeyValues.

All send operations are executed synchronously.

With these abstractions in place, sending content to your embedded Kafka cluster is easy. Have a look at the following examples . One thing you should notice is that you do not have to specify bootstrap.servers. Kafka for JUnit adjusts a given client configuration so that you can start off with meaningful defaults that work out-of-the-box. You’ll only have to provide configuration overrides if it is absolutely necessary for your test.

5.1. Sending non-keyed values using defaults

kafka.send(SendValues.to("test-topic", "a", "b", "c"));

5.2. Sending non-keyed values using overrides

kafka.send(SendValues.to("test-topic", "a", "b", "c")
    .with(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
    .with(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"));

5.3. Sending non-keyed values transactionally

kafka
    .send(SendValuesTransactional
    .inTransaction("test-topic", Arrays.asList("a", "b", "c")));
The API of Kafka for JUnit has been designed with great care and readability in mind. Using static imports for factory methods shows that we can interact with the embedded Kafka cluster in a lean and readable way.
kafka.send(inTransaction("test-topic", Arrays.asList("a", "b", "c")));

5.4. Sending keyed records using defaults

List<KeyValue<String, String>> records = new ArrayList<>();

records.add(new KeyValue<>("aggregate", "a"));
records.add(new KeyValue<>("aggregate", "b"));
records.add(new KeyValue<>("aggregate", "c"));

kafka.send(SendKeyValues.to("test-topic", records));

5.5. Sending keyed records using overrides

List<KeyValue<String, String>> records = new ArrayList<>();

records.add(new KeyValue<>("aggregate", "a"));
records.add(new KeyValue<>("aggregate", "b"));
records.add(new KeyValue<>("aggregate", "c"));

kafka.send(SendKeyValues.to("test-topic", records)
    .with(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
    .with(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"));

5.6. Sending keyed records transactionally

List<KeyValue<String, String>> records = new ArrayList<>();

records.add(new KeyValue<>("aggregate", "a"));
records.add(new KeyValue<>("aggregate", "b"));
records.add(new KeyValue<>("aggregate", "c"));

kafka.send(inTransaction("test-topic", records));

5.7. Sending records or values transactionally to multiple topics

kafka.send(SendValuesTransactional
    .inTransaction("test-topic-1", Arrays.asList("a", "b"))
    .inTransaction("test-topic-2", Arrays.asList("c", "d")));

5.8. Failing a transaction on purpose

kafka.send(SendValuesTransactional
    .inTransaction("test-topic", Arrays.asList("a", "b"))
    .failTransaction());

Defining a SendValuesTransactional request with failTransaction will write records to the Kafka log, but abort the transaction they belong to. This allows you to test if your application-specific Kafka consumers adhere to the transactional guarantees they claim to satisfy, since only a correct implementation of a consumer with isolation.level set to read_committed must see - and process - those records.

This works for SendKeyValuesTransactional as well.

5.9. Attaching record headers

KeyValue<String, String> record = new KeyValue<>("a", "b");
record.addHeader("client", "kafka-junit-test".getBytes("utf-8"));

kafka.send(SendKeyValues
    .to("test-topic", Collections.singletonList(record)));
You can also pre-construct an instance of Headers and pass it along via the constructor of a KeyValue.

6. Consuming records

Class EmbeddedKafkaClusterRule as well as EmbeddedKafkaCluster expose convenience methods for consuming Kafka records. Have a look at the RecordConsumer interface (Javadoc omitted for brevity).

public interface RecordConsumer {

    <V> List<V> readValues(ReadKeyValues<String, V> readRequest);
    <V> List<V> observeValues(ObserveKeyValues<String, V> observeRequest) throws InterruptedException;
    <K, V> List<KeyValue<K, V>> read(ReadKeyValues<K, V> readRequest);
    <K, V> List<KeyValue<K, V>> observe(ObserveKeyValues<K, V> observeRequest) throws InterruptedException;
}

Implementations of the RecordConsumer interface use the high-level consumer API that comes with Apache Kafka. Hence, the underlying consumer is a KafkaConsumer. This KafkaConsumer is fully parameterizable via both ReadKeyValues and ObserveKeyValues by means of the provided builders.

All operations are executed synchronously.

With these abstractions in place, reading content from a Kafka topic is easy. As with a RecordProducer, there is no need to specify things like bootstrap.servers - Kafka for JUnit will provide the necessary configuration. Have a look at the following examples.

6.1. Consuming values using defaults

List<String> values = kafka.readValues(ReadKeyValues.from("test-topic"));

By default, ReadKeyValues.from uses StringDeserializer.class for both the record key and value. Calling readValues just yields the values from the consumed Kafka records. Please have a look at the next example if you are interested in obtaining not only the values, but also the record key and possibly attached headers.

6.2. Consuming key-value based records using defaults

List<KeyValue<String, String>> consumedRecords = kafka.read(ReadKeyValues.from("test-topic"));

Notice the difference to the example before this one: Instead of calling readValues we call read using the same ReadKeyValues request. Instead of a List<String>, this yields a List<KeyValue<String, String>> where each KeyValue is comprised of the record key, the record value and the headers that have been attached to that record.

6.3. Consuming key-value based records using overrides

List<KeyValue<String Long>> consumedRecords = kafka.read(ReadKeyValues
    .from("test-topic-value-types", Long.class)
    .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class));

Since we are interested in consuming records that use Long-based values, we have to parameterize the ReadKeyValues request such that the proper type is bound and a compatible Deserializer is used. This is all done by calling from(String, Class<V>) and overriding the default deserializer using with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class).

6.4. Working with attached headers

Headers headersOfFirstRecord = kafka.read(ReadKeyValues.from("test-topic"))
        .stream()
        .findFirst()
        .map(KeyValue::getHeaders)
        .orElseThrow(() -> new RuntimeException("No records found."));

The example grabs the Headers of the first record that it reads. Headers is a class that comes from the Kafka Client API. See its Javadoc for a thorough explanation of its public interface.

6.5. Consuming values or records transactionally

List<String> consumedValues = kafka.readValues(ReadKeyValues
    .from("test-topic")
    .with(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"));

Consuming records that have been written transactionally is just a matter of the configuration of the underlying KafkaConsumer. As ReadKeyValues provides full access to the configuration of the KafkaConsumer a simple with(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed") suffices to enable transactional consume semantics.

6.6. Observing a topic until N values have been consumed

List<String> observedValues = kafka.observeValues(ObserveKeyValues.on("test-topic", 3));

Sometimes you are not interested in reading all available values from a topic, but want to block test execution until a certain amount of values have been read from a topic. This enables you to synchronize your test logic with the system-under-test as the test blocks until the system-under-test has been able to write its records to the Kafka topic you are currently observing.

Of course, observing a topic cannot run indefinitely and thus has to be parameterized with a timeout. There is a default timeout which should be sensible for most usage scenarios. However, if you need to observe a topic for a longer amount of time, you can easily parameterize the ObserveKeyValues request using observeFor(int, TimeUnit).

If the timeout elapses before the desired amount of values have been read from the given topic, the observe method will throw an AssertionError. Hence, if you are just interested in the fact that records have been written by the system-under-test to the topic you are observing, it fully suffices to use

kafka.observeValues(ObserveKeyValues.on("test-topic", 3));

and let the timeout elapse to fail the test.

If you are interested in the observed values, you can however simply grab all records - like shown above - and perform additional assertions on them.

6.7. Observing a topic until N records have been consumed

List<KeyValue<String, String>> observedValues = kafka.observe(ObserveKeyValues.on("test-topic", 3));

This is just the same as the example above, but instead of observing and returning List<String> it returns a List<KeyValue<String, String>> in the example.

6.8. Using key filters when consuming or observing a topic

Predicate<String> keyFilter = k -> Integer.parseInt(k) % 2 == 0;

List<KeyValue<String, Integer>> consumedRecords = kafka.read(ReadKeyValues
    .from("test-topic", Integer.class)
    .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class)
    .filterOnKeys(keyFilter));

It is possible to parameterize both ReadKeyValues and ObserveKeyValues with a key filter. The key filter is modelled using java.util.function.Predicate and thus can be arbitrarily complex. The default key filter evaluates to true every time, so unless you do not explicitly provide a filter using filterOnKeys like in the example, all consumed records pass the filter.

In this example, the filter is quite simply and parses the key of the record into an Integer and checks if it is evenly divisible by 2. So, if - for the sake of the example - the topic from which we read contains the keys "1", "2", "3" and "4", only those records with keys "2" and "4" would pass the filter.

Applying a key filter is also possible when observing a topic.
Combining key, value and header filters is possible. Please note that in this case only such records that pass all filters are returned. Hence, conjoining key, value and header filters has AND semantics.

6.9. Using value filters when consuming or observing a topic

Predicate<Integer> valueFilter = v -> v > 2;

List<KeyValue<String, Integer>> consumedRecords = kafka.read(ReadKeyValues
    .from("test-topic", Integer.class)
    .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class)
    .filterOnValues(valueFilter));

It is possible to parameterize both ReadKeyValues and ObserveKeyValues with a value filter. Like the key filter, the value filter is also modelled using java.util.function.Predicate. The default value filter evaluates to true every time, so unless you do not explicitly provide a filter using filterOnValues like in the example, all consumed records pass the filter.

In this example, the filter only lets those records pass for which the associated Integer-based record value is larger than 2. So, if the topic holds records with values 1, 2 and 3, only the record with value 3 would pass the filter.

Applying a value filter is also possible when observing a topic.
Combining key, value and header filters is possible. Please note that in this case only such records that pass all filters are returned. Hence, conjoining key, value and header filters has AND semantics.

6.10. Using header filters when consuming or observing a topic

Predicate<Headers> headersFilter = headers -> new String(headers.lastHeader("aggregate").value()).equals("a");

List<KeyValue<String, Integer>> consumedRecords = kafka.read(ReadKeyValues
    .from("test-topic-header-filter", Integer.class)
    .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class)
    .filterOnHeaders(headersFilter));

It is possible to parameterize both ReadKeyValues and ObserveKeyValues with a headers filter. Like key and value filters, it is modelled using a java.util.function.Predicate on the target type org.apache.kafka.common.header.Headers. The default headers filter evaluates to true every time, so unless you do not explicitly provide a filter using filterOnHeaders like in the example, all consumed records pass the filter.

In this example, the filter only lets those records pass for which the header aggregate is set to the String a.

Applying a header filter is also possible when observing a topic.
Combining key, value and header filters is possible. Please note that in this case only such records that pass all filters are returned. Hence, conjoining key, value and header filters has AND semantics.

6.11. Obtaining metadata per-record

An instance of KeyValue is associated with the optional type KeyValueMetadata. By default, this type is not set and thus KeyValue::getMetadata returns Optional.empty. Both ReadKeyValues and ObserveKeyValues provide a method called includeMetadata that explicitly enables metadata on a per-record basis. The listing underneath demonstrates this:

List<KeyValue<String, String>> records = kafka.observe(ObserveKeyValues
    .on("test-topic", 3)
    .includeMetadata());

In this example, all instances of KeyValue feature an instance of Optional<KeyValueMetadata> which contains metadata for the resp. record. Metadata is currently limited to the coordinates of the record and thus closes over the 3-tuple (topic, partition, offset).

6.12. Seeking to a dedicated offset of a topic-partition

Consuming data continuously from topics that contain a huge amountof data may take quite some time, if a new consumer instance always starts to read from the beginning of the topic. To speed things up, you can skip to a dedicated offset for topic-partitions and start reading from there. The example underneath demonstrates how this is done when reading key-values using ReadKeyValues.

List<KeyValue<String, String>> records = kafka.read(ReadKeyValues
    .from("test-topic")
    .seekTo(0, 2));
Seeking is also a feature of ObserveKeyValues which means, that seeking to a dedicated offset is possible for all operations that a RecordConsumer provides.

7. Managing topics

Class EmbeddedKafkaClusterRule as well as EmbeddedKafkaCluster expose convenience methods for managing Kafka topics. Have a look at the TopicManager interface (Java omitted for brevity).

public interface TopicManager {
    void createTopic(TopicConfig config);
    void deleteTopic(String topic);
    boolean exists(String topic);
    Map<Integer, LeaderAndIsr> fetchLeaderAndIsr(String topic);
    Properties fetchTopicConfig(String topic);
}

Implementations of the TopicManager interface currently use the AdminClient implementation of the Kafka Client library for topic management.

All operations are executed synchronously.

7.1. Creating a topic

kafka.createTopic(TopicConfig.withName("test-topic"));
By default, Kafka for JUnit enables the automatic creation of topics at the broker with defaults that should be sensible for local testing. However, if you find yourself in the situation to create a topic with a specific replication factor or number of partitions that deviate from their default setting, you should create that topic with the respective settings before writing the first Kafka record to it.

7.2. Deleting a topic

kafka.deleteTopic("test-topic");
Deleting a topic will only set a deletion marker for that topic. The topic may not be deleted immediately after deleteTopic completes.

7.3. Determine whether a topic exists

kafka.exists("test-topic");
Returns true even if the topic is marked for deletion.

7.4. Retrieving the leader and the In-Sync-Replica Set (ISR)

In case you have multiple brokers running and want to query their assignments and roles for a specific topic, you can use TopicManager#fetchLeaderAndIsr to retrieve that kind of information. The method returns an unmodifiable java.util.Map of LeaderAndIsr instances by their designated partition. The listing underneath shows how to retrieve this information for the topic named test-topic.

Map<Integer, LeaderAndIsr> leaderAndIsr = kafka.fetchLeaderAndIsr("test-topic");

The type LeaderAndIsr is not to be confused with the same type in package kafka.api. The LeaderAndIsr implementation Kafka for JUnit is a simple transfer object that only contains the ID of the leader node and the IDs of all nodes that comprise the ISR.

7.5. Retrieving the topic configuration remotely

Looking up the topic configuration by accessing the cluster is easily done using the TopicManager.

Properties topicConfig = kafka.fetchTopicConfig("test-topic");

8. License

This work is released under the terms of the Apache 2.0 license.