1. Introduction
Kafka for JUnit enables developers to start and stop a complete Kafka cluster comprised of Kafka brokers and distributed Kafka Connect workers from within a JUnit test. It also provides a rich set of convenient accessors to interact with such an embedded Kafka cluster in a lean and non-obtrusive way.
Kafka for JUnit can be used to both whitebox-test individual Kafka-based components of your application or to blackbox-test applications that offer an incoming and/or outgoing Kafka-based interface.
2. Using Kafka for JUnit in your tests
Kafka for JUnit provides the necessary infrastructure to exercise your Kafka-based components against an embeddable Kafka cluster (cf. Working with an embedded Kafka cluster). However, Kafka for JUnit got you covered as well if you are simply interested in using the convenient accessors against Kafka clusters that are already present in your infrastructure (cf. section Working with an external Kafka cluster).
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
class KafkaTest {
private EmbeddedKafkaCluster kafka;
@BeforeEach
void setupKafka() {
kafka = provisionWith(defaultClusterConfig());
kafka.start();
}
@AfterEach
void tearDownKafka() {
kafka.stop();
}
@Test
void shouldWaitForRecordsToBePublished() throws Exception {
kafka.send(to("test-topic", "a", "b", "c"));
kafka.observe(on("test-topic", 3));
}
}
This starts an embedded Kafka cluster and submits three records to the topic named test-topic
. The call to kafka.observe(on("test-topic", 3))
watches that same topic for a configurable amount of time and checks if it observes the previously submitted records. If it doesn’t, Kafka for JUnit raises an AssertionError
which would fail the test. Surely, Kafka for JUnit provides lots of more ways to interact with a Kafka cluster.
Since EmbeddedKafkaCluster
implements the AutoCloseable
interface, you can achieve the same behavior using a try-with-resources
-construct.
import org.junit.jupiter.api.Test;
import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
class KafkaTest {
@Test
void shouldWaitForRecordsToBePublished() throws Exception {
try (EmbeddedKafkaCluster kafka = provisionWith(defaultClusterConfig())) {
kafka.start();
kafka.send(to("test-topic", "a", "b", "c"));
kafka.observe(on("test-topic", 3));
}
}
}
2.1. Supported versions of Apache Kafka
Version of Kafka for JUnit | Supports |
---|---|
0.1.x |
Apache Kafka 1.0.0 |
0.2.x |
Apache Kafka 1.0.0 |
0.3.x |
Apache Kafka 1.0.0 |
1.0.x |
Apache Kafka 1.1.1 |
2.0.x |
Apache Kafka 2.0.0 |
2.1.x |
Apache Kafka 2.1.1 |
2.2.x |
Apache Kafka 2.2.1 |
2.3.x |
Apache Kafka 2.3.0 |
2.4.x |
Apache Kafka 2.4.0 |
2.5.x |
Apache Kafka 2.5.1 |
2.6.x |
Apache Kafka 2.6.0 |
2.7.x |
Apache Kafka 2.7.0 |
2.8.x |
Apache Kafka 2.8.0 |
3.0.x |
Apache Kafka 3.0.0 |
3.1.x |
Apache Kafka 3.1.0 |
3.2.x |
Apache Kafka 3.2.0 |
3. Working with an embedded Kafka cluster
Kafka for JUnit is able to spin up a fully-fledged embedded Kafka cluster that is accessible via class EmbeddedKafkaCluster
. EmbeddedKafkaCluster
implements the interfaces RecordProducer
, RecordConsumer
and TopicManager
and thus provides convenient accessors to interact with the cluster.
Using EmbeddedKafkaCluster
in a JUnit test is quite simple. The necessary code to set it up is minimal if you are comfortable with the default configuration.
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
class KafkaTest {
private EmbeddedKafkaCluster kafka;
@BeforeEach
void setupKafka() {
kafka = provisionWith(defaultClusterConfig());
kafka.start();
}
@AfterEach
void tearDownKafka() {
kafka.stop();
}
}
Kafka for JUnit uses the Builder pattern extensively to provide a fluent API when provisioning an embedded Kafka cluster. Let’s take a closer look at method EmbeddedKafkaCluster.provisionWith
. This method consumes a configuration of type EmbeddedKafkaClusterConfig
. EmbeddedKafkaClusterConfig
uses defaults for the Kafka broker and ZooKeeper. By default, Kafka Connect will not be provisioned at all. The builder of EmbeddedKafkaClusterConfig
provides a provisionWith
method as well and is overloaded to accept configurations of type EmbeddedZooKeeperConfig
, EmbeddedKafkaConfig
and EmbeddedConnectConfig
. The following listing demonstrates how to adjust the configuration of the embedded Kafka broker wrt. the default number of partitions for newly created topics.
EmbeddedKafkaCluster kafka = provisionWith(newClusterConfig()
.configure(kafkaConnect()
.with(KafkaConfig$.MODULE$.NumPartitionsProp(), "5")));
The builders for those configurations provide a uniform interface for overriding defaults, comprising two methods with(String propertyName, T value)
and withAll(java.util.Properties overrides)
. To override a default value, you simply provide the name of the configuration parameter as defined by the resp. Kafka component along with the new value.
Using the default setting will provide you with a single embedded Kafka broker. This ought to be sufficient for most cases. However, there are scenarios which require testing against multiple brokers that form a cluster. Forming an embedded cluster with multiple brokers is done by adjusting the default provisioning of your test case. See the listing underneath for an example.
EmbeddedKafkaCluster kafka = provisionWith(newClusterConfig()
.configure(brokers()
.withNumberOfBrokers(3)
.with(KafkaConfig$.MODULE$.NumPartitionsProp(), "5")
.with(KafkaConfig$.MODULE$.DefaultReplicationFactorProp(), "3")
.with(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), "2")
.with(KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), "3")
.with(KafkaConfig$.MODULE$.TransactionsTopicReplicationFactorProp(), "3")
.with(KafkaConfig$.MODULE$.TransactionsTopicMinISRProp(), "2")));
Using this configuration, we end up with a total of three brokers that form an embedded Kafka cluster, while the defaults for topic partitions and replicas have been adjusted to be consistent with the size of the cluster.
Of course, you can also use the try-with-resources
-pattern to fire up an embedded cluster. Have a look at the following test setup.
import org.junit.jupiter.api.Test;
import static net.mguenther.kafka.junit.EmbeddedKafkaCluster.provisionWith;
import static net.mguenther.kafka.junit.EmbeddedKafkaClusterConfig.defaultClusterConfig;
class KafkaTest {
@Test
void shouldWaitForRecordsToBePublished() throws Exception {
try (EmbeddedKafkaCluster kafka = provisionWith(defaultClusterConfig())) {
kafka.start();
kafka.send(to("test-topic", "a", "b", "c"));
kafka.observe(on("test-topic", 3));
}
}
}
See sections on Producing records, Consuming records and Managing topics for further reference on how to interact with the cluster.
3.1. Failure Modes
EmbeddedKafkaCluster
provides the means to disconnect - and re-connect of course - specific embedded Kafka brokers. All brokers in the embedded cluster get broker ID assigned during cluster formation. This broker ID is an Integer
-based value and starts at 1. The broker ID increases by 1 for every subsequent embedded Kafka broker that is started during cluster formation.
Clusters stay fixed wrt. the maximum number of embedded brokers. But individual brokers can, given their broker ID, be disconnected from the rest of the cluster to test for failure scenarios. Such failure scenarios include:
-
How does may Kafka-based component behave in the presence of broker outages?
-
What happens if the In-Sync-Replica Set (ISR) of a topic that my application consumes from shrinks below its minimum size?
-
Is my application able to progress after brokers re-connect and form a working cluster?
3.1.1. Disconnect and reconnect a single broker
The following listing shows how to disconnect and re-connect a certain broker, while fetching the ISR of a dedicated topic in between these operations to determine whether the cluster behaves correctly.
If you do use this feature of Kafka for JUnit, then please give the embedded cluster some time to handle broker churn. Identifying that a leader for a topic-partition is not available and conducting the leader election takes some time. In the example underneath we introduce a delay of five seconds in between operations that affect cluster membership. |
kafka.createTopic(TopicConfig.withName("test-topic")
.withNumberOfPartitions(5)
.withNumberOfReplicas(3));
delay(5);
Set<Integer> leaders = kafka.fetchLeaderAndIsr("test-topic")
.values()
.stream()
.map(LeaderAndIsr::getLeader)
.collect(Collectors.toSet());
assertThat(leaders.contains(1)).isTrue();
assertThat(leaders.contains(2)).isTrue();
assertThat(leaders.contains(3)).isTrue();
kafka.disconnect(1);
delay(5);
Set<Integer> leadersAfterDisconnect = kafka.fetchLeaderAndIsr("test-topic")
.values()
.stream()
.map(LeaderAndIsr::getLeader)
.collect(Collectors.toSet());
assertThat(leadersAfterDisconnect.contains(1)).isFalse();
assertThat(leadersAfterDisconnect.contains(2)).isTrue();
assertThat(leadersAfterDisconnect.contains(3)).isTrue();
kafka.connect(1);
delay(5);
Set<Integer> leadersAfterReconnect = kafka.fetchLeaderAndIsr("test-topic")
.values()
.stream()
.map(LeaderAndIsr::getLeader)
.collect(Collectors.toSet());
assertThat(leadersAfterReconnect.contains(1)).isTrue();
assertThat(leadersAfterReconnect.contains(2)).isTrue();
assertThat(leadersAfterReconnect.contains(3)).isTrue();
3.1.2. Disconnect until In-Sync-Replica Set falls below minimum size
The following listing shows how to disconnect the In-Sync-Replica Set (ISR) for a given topic until its ISR falls below its minimum size.
If you do use this feature of Kafka for JUnit, then please give the embedded cluster some time to handle broker churn. Identifying that a leader for a topic-partition is not available and conducting the leader election takes some time. In the example underneath we introduce a delay of five seconds in between operations that affect cluster membership. |
// Create a topic and configure the number of replicas as well as the size of the ISR
kafka.createTopic(TopicConfig.withName("test-topic")
.withNumberOfPartitions(5)
.withNumberOfReplicas(3)
.with("min.insync.replicas", "2"));
// Wait a bit to give the cluster a chance to properly assign topic-partitions to leaders
delay(5);
// Disconnect until the remaining number of brokers fall below the minimum ISR size
kafka.disconnectUntilIsrFallsBelowMinimumSize("test-topic");
delay(5);
// Submitting records to this topic will yield a NotEnoughReplicasException
kafka.send(SendValues.to("test-topic", "A"));
The last line of the listing shows the effect of an ISR that can no longer operate reliably. Your Kafka-based component or application would run concurrently to this test so that you are able to observe if it behaves correctly (e.g. by checking that the component progresses normally if the ISR is restored).
3.1.3. Restoring the In-Sync-Replica Set
Restoring the In-Sync-Replica Set is easy, as method disconnectUntilIsrFallsBelowMinimumSize
returns a list of broker IDs for all brokers that have been deactivated during the shrinking. The following listing shows how to restore the ISR.
kafka.createTopic(TopicConfig.withName("test-topic")
.withNumberOfPartitions(5)
.withNumberOfReplicas(3)
.with("min.insync.replicas", "2"));
delay(5);
Set<Integer> disconnectedBrokers = kafka.disconnectUntilIsrFallsBelowMinimumSize("test-topic");
delay(5);
// Do some testing, trigger some operations, observe the behavior of your application
kafka.connect(disconnectedBrokers);
// Give the cluster some time to assign leaders and reestablish the ISR
delay(5);
// Do some more testing ...
4. Working with an external Kafka cluster
Kafka for JUnit can be used to work with an external Kafka cluster. This is useful if you want to execute your tests against a staging/testing environment or if you already use other testing libraries (e.g. Testcontainers) that spin up a Kafka cluster on your local machine, but want to use the convenient accessors provided by Kafka for JUnit.
Class ExternalKafkaCluster
integrates an external cluster. Just like EmbeddableKafkaCluster
, an ExternalKafkaCluster
also implements the interfaces RecordProducer
, RecordConsumer
and TopicManager
and thus provides convenient accessors to interact with the cluster.
Using ExternalKafkaCluster
in a JUnit test is easy. The listing below shows the necessary code to use ExternalKafkaCluster
in combination with Testcontainers.
@Testcontainers
class ExternalKafkaClusterTest {
// This is not part of Kafka for JUnit, but a sub-module provided
// by Testcontainers (org.testcontainers:kafka)
@Container
private KafkaContainer kafkaContainer = new KafkaContainer();
@Test
@DisplayName("should be able to observe records written to an external Kafka cluster")
void externalKafkaClusterShouldWorkWithExternalResources() throws Exception {
ExternalKafkaCluster kafka = ExternalKafkaCluster.at(kafkaContainer.getBootstrapServers());
// use the accessors that cluster provides to interact with the Kafka container
[...]
}
}
See sections on Producing records, Consuming records and Managing topics for further reference on how to interact with the cluster.
5. Producing records
Class EmbeddedKafkaClusterRule
as well as EmbeddedKafkaCluster
expose convenience methods for producing new Kafka records. Have a look at the RecordProducer
interface (Javadoc omitted for brevity).
public interface RecordProducer {
<V> List<RecordMetadata> send(SendValues<V> sendRequest) throws InterruptedException;
<V> List<RecordMetadata> send(SendValuesTransactional<V> sendRequest) throws InterruptedException;
<K, V> List<RecordMetadata> send(SendKeyValues<K, V> sendRequest) throws InterruptedException;
<K, V> List<RecordMetadata> send(SendKeyValuesTransactional<K, V> sendRequest) throws InterruptedException;
}
Calling send
using an instance of SendValues
does just that: It produces non-keyed Kafka records that only feature a value. The key of a record that has been produced this way is simply null
. If you wish to associate a key, you can do so by passing an instance of SendKeyValues
to the send
method. Both SendValues
and SendKeyValues
use the Builder pattern so that creating the resp. send parameterization is easy and does not pollute your test code with any kind of boilerplate.
Implementations of the RecordProducer
interface use the high-level producer API that comes with Apache Kafka. Hence, the underlying producer is a KafkaProducer
. This KafkaProducer
is fully parameterizable via the builders of both SendValues
and SendKeyValues
.
All send
operations are executed synchronously.
With these abstractions in place, sending content to your embedded Kafka cluster is easy. Have a look at the following examples . One thing you should notice is that you do not have to specify bootstrap.servers
. Kafka for JUnit adjusts a given client configuration so that you can start off with meaningful defaults that work out-of-the-box. You’ll only have to provide configuration overrides if it is absolutely necessary for your test.
5.1. Sending non-keyed values using defaults
kafka.send(SendValues.to("test-topic", "a", "b", "c"));
5.2. Sending non-keyed values using overrides
kafka.send(SendValues.to("test-topic", "a", "b", "c")
.with(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
.with(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"));
5.3. Sending non-keyed values transactionally
kafka
.send(SendValuesTransactional
.inTransaction("test-topic", Arrays.asList("a", "b", "c")));
The API of Kafka for JUnit has been designed with great care and readability in mind. Using static imports for factory methods shows that we can interact with the embedded Kafka cluster in a lean and readable way.
|
kafka.send(inTransaction("test-topic", Arrays.asList("a", "b", "c")));
5.4. Sending keyed records using defaults
List<KeyValue<String, String>> records = new ArrayList<>();
records.add(new KeyValue<>("aggregate", "a"));
records.add(new KeyValue<>("aggregate", "b"));
records.add(new KeyValue<>("aggregate", "c"));
kafka.send(SendKeyValues.to("test-topic", records));
5.5. Sending keyed records using overrides
List<KeyValue<String, String>> records = new ArrayList<>();
records.add(new KeyValue<>("aggregate", "a"));
records.add(new KeyValue<>("aggregate", "b"));
records.add(new KeyValue<>("aggregate", "c"));
kafka.send(SendKeyValues.to("test-topic", records)
.with(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
.with(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"));
5.6. Sending keyed records transactionally
List<KeyValue<String, String>> records = new ArrayList<>();
records.add(new KeyValue<>("aggregate", "a"));
records.add(new KeyValue<>("aggregate", "b"));
records.add(new KeyValue<>("aggregate", "c"));
kafka.send(inTransaction("test-topic", records));
5.7. Sending records or values transactionally to multiple topics
kafka.send(SendValuesTransactional
.inTransaction("test-topic-1", Arrays.asList("a", "b"))
.inTransaction("test-topic-2", Arrays.asList("c", "d")));
5.8. Failing a transaction on purpose
kafka.send(SendValuesTransactional
.inTransaction("test-topic", Arrays.asList("a", "b"))
.failTransaction());
Defining a SendValuesTransactional
request with failTransaction
will write records to the Kafka log, but abort the transaction they belong to. This allows you to test if your application-specific Kafka consumers adhere to the transactional guarantees they claim to satisfy, since only a correct implementation of a consumer with isolation.level
set to read_committed
must see - and process - those records.
This works for SendKeyValuesTransactional as well.
|
5.9. Attaching record headers
KeyValue<String, String> record = new KeyValue<>("a", "b");
record.addHeader("client", "kafka-junit-test".getBytes("utf-8"));
kafka.send(SendKeyValues
.to("test-topic", Collections.singletonList(record)));
You can also pre-construct an instance of Headers and pass it along via the constructor of a KeyValue .
|
6. Consuming records
Class EmbeddedKafkaClusterRule
as well as EmbeddedKafkaCluster
expose convenience methods for consuming Kafka records. Have a look at the RecordConsumer
interface (Javadoc omitted for brevity).
public interface RecordConsumer {
<V> List<V> readValues(ReadKeyValues<String, V> readRequest);
<V> List<V> observeValues(ObserveKeyValues<String, V> observeRequest) throws InterruptedException;
<K, V> List<KeyValue<K, V>> read(ReadKeyValues<K, V> readRequest);
<K, V> List<KeyValue<K, V>> observe(ObserveKeyValues<K, V> observeRequest) throws InterruptedException;
}
Implementations of the RecordConsumer
interface use the high-level consumer API that comes with Apache Kafka. Hence, the underlying consumer is a KafkaConsumer
. This KafkaConsumer
is fully parameterizable via both ReadKeyValues
and ObserveKeyValues
by means of the provided builders.
All operations are executed synchronously.
With these abstractions in place, reading content from a Kafka topic is easy. As with a RecordProducer
, there is no need to specify things like bootstrap.servers
- Kafka for JUnit will provide the necessary configuration. Have a look at the following examples.
6.1. Consuming values using defaults
List<String> values = kafka.readValues(ReadKeyValues.from("test-topic"));
By default, ReadKeyValues.from
uses StringDeserializer.class
for both the record key and value. Calling readValues
just yields the values from the consumed Kafka records. Please have a look at the next example if you are interested in obtaining not only the values, but also the record key and possibly attached headers.
6.2. Consuming key-value based records using defaults
List<KeyValue<String, String>> consumedRecords = kafka.read(ReadKeyValues.from("test-topic"));
Notice the difference to the example before this one: Instead of calling readValues
we call read
using the same ReadKeyValues
request. Instead of a List<String>
, this yields a List<KeyValue<String, String>>
where each KeyValue
is comprised of the record key, the record value and the headers that have been attached to that record.
6.3. Consuming key-value based records using overrides
List<KeyValue<String Long>> consumedRecords = kafka.read(ReadKeyValues
.from("test-topic-value-types", Long.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class));
Since we are interested in consuming records that use Long
-based values, we have to parameterize the ReadKeyValues
request such that the proper type is bound and a compatible Deserializer
is used. This is all done by calling from(String, Class<V>)
and overriding the default deserializer using with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class)
.
6.4. Working with attached headers
Headers headersOfFirstRecord = kafka.read(ReadKeyValues.from("test-topic"))
.stream()
.findFirst()
.map(KeyValue::getHeaders)
.orElseThrow(() -> new RuntimeException("No records found."));
The example grabs the Headers
of the first record that it reads. Headers
is a class that comes from the Kafka Client API. See its Javadoc for a thorough explanation of its public interface.
6.5. Consuming values or records transactionally
List<String> consumedValues = kafka.readValues(ReadKeyValues
.from("test-topic")
.with(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"));
Consuming records that have been written transactionally is just a matter of the configuration of the underlying KafkaConsumer
. As ReadKeyValues
provides full access to the configuration of the KafkaConsumer
a simple with(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
suffices to enable transactional consume semantics.
6.6. Observing a topic until N
values have been consumed
List<String> observedValues = kafka.observeValues(ObserveKeyValues.on("test-topic", 3));
Sometimes you are not interested in reading all available values from a topic, but want to block test execution until a certain amount of values have been read from a topic. This enables you to synchronize your test logic with the system-under-test as the test blocks until the system-under-test has been able to write its records to the Kafka topic you are currently observing.
Of course, observing a topic cannot run indefinitely and thus has to be parameterized with a timeout. There is a default timeout which should be sensible for most usage scenarios. However, if you need to observe a topic for a longer amount of time, you can easily parameterize the ObserveKeyValues
request using observeFor(int, TimeUnit)
.
If the timeout elapses before the desired amount of values have been read from the given topic, the observe
method will throw an AssertionError
. Hence, if you are just interested in the fact that records have been written by the system-under-test to the topic you are observing, it fully suffices to use
kafka.observeValues(ObserveKeyValues.on("test-topic", 3));
and let the timeout elapse to fail the test.
If you are interested in the observed values, you can however simply grab all records - like shown above - and perform additional assertions on them.
6.7. Observing a topic until N
records have been consumed
List<KeyValue<String, String>> observedValues = kafka.observe(ObserveKeyValues.on("test-topic", 3));
This is just the same as the example above, but instead of observing and returning List<String>
it returns a List<KeyValue<String, String>>
in the example.
6.8. Using key filters when consuming or observing a topic
Predicate<String> keyFilter = k -> Integer.parseInt(k) % 2 == 0;
List<KeyValue<String, Integer>> consumedRecords = kafka.read(ReadKeyValues
.from("test-topic", Integer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class)
.filterOnKeys(keyFilter));
It is possible to parameterize both ReadKeyValues
and ObserveKeyValues
with a key filter. The key filter is modelled using java.util.function.Predicate
and thus can be arbitrarily complex. The default key filter evaluates to true
every time, so unless you do not explicitly provide a filter using filterOnKeys
like in the example, all consumed records pass the filter.
In this example, the filter is quite simply and parses the key of the record into an Integer
and checks if it is evenly divisible by 2. So, if - for the sake of the example - the topic from which we read contains the keys "1"
, "2"
, "3"
and "4"
, only those records with keys "2"
and "4"
would pass the filter.
Applying a key filter is also possible when observing a topic. |
Combining key, value and header filters is possible. Please note that in this case only such records that pass all filters are returned. Hence, conjoining key, value and header filters has AND semantics.
|
6.9. Using value filters when consuming or observing a topic
Predicate<Integer> valueFilter = v -> v > 2;
List<KeyValue<String, Integer>> consumedRecords = kafka.read(ReadKeyValues
.from("test-topic", Integer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class)
.filterOnValues(valueFilter));
It is possible to parameterize both ReadKeyValues
and ObserveKeyValues
with a value filter. Like the key filter, the value filter is also modelled using java.util.function.Predicate
. The default value filter evaluates to true
every time, so unless you do not explicitly provide a filter using filterOnValues
like in the example, all consumed records pass the filter.
In this example, the filter only lets those records pass for which the associated Integer
-based record value is larger than 2. So, if the topic holds records with values 1
, 2
and 3
, only the record with value 3
would pass the filter.
Applying a value filter is also possible when observing a topic. |
Combining key, value and header filters is possible. Please note that in this case only such records that pass all filters are returned. Hence, conjoining key, value and header filters has AND semantics.
|
6.10. Using header filters when consuming or observing a topic
Predicate<Headers> headersFilter = headers -> new String(headers.lastHeader("aggregate").value()).equals("a");
List<KeyValue<String, Integer>> consumedRecords = kafka.read(ReadKeyValues
.from("test-topic-header-filter", Integer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class)
.filterOnHeaders(headersFilter));
It is possible to parameterize both ReadKeyValues
and ObserveKeyValues
with a headers filter. Like key and value filters, it is modelled using a java.util.function.Predicate
on the target type org.apache.kafka.common.header.Headers
. The default headers filter evaluates to true
every time, so unless you do not explicitly provide a filter using filterOnHeaders
like in the example, all consumed records pass the filter.
In this example, the filter only lets those records pass for which the header aggregate
is set to the String
a
.
Applying a header filter is also possible when observing a topic. |
Combining key, value and header filters is possible. Please note that in this case only such records that pass all filters are returned. Hence, conjoining key, value and header filters has AND semantics.
|
6.11. Obtaining metadata per-record
An instance of KeyValue
is associated with the optional type KeyValueMetadata
. By default, this type is not set and thus KeyValue::getMetadata
returns Optional.empty
. Both ReadKeyValues
and ObserveKeyValues
provide a method called includeMetadata
that explicitly enables metadata on a per-record basis. The listing underneath demonstrates this:
List<KeyValue<String, String>> records = kafka.observe(ObserveKeyValues
.on("test-topic", 3)
.includeMetadata());
In this example, all instances of KeyValue
feature an instance of Optional<KeyValueMetadata>
which contains metadata for the resp. record. Metadata is currently limited to the coordinates of the record and thus closes over the 3-tuple (topic, partition, offset)
.
6.12. Seeking to a dedicated offset of a topic-partition
Consuming data continuously from topics that contain a huge amountof data may take quite some time, if a new consumer instance always starts to read from the beginning of the topic. To speed things up, you can skip to a dedicated offset for topic-partitions and start reading from there. The example underneath demonstrates how this is done when reading key-values using ReadKeyValues
.
List<KeyValue<String, String>> records = kafka.read(ReadKeyValues
.from("test-topic")
.seekTo(0, 2));
Seeking is also a feature of ObserveKeyValues which means, that seeking to a dedicated offset is possible for all operations that a RecordConsumer provides.
|
7. Managing topics
Class EmbeddedKafkaClusterRule
as well as EmbeddedKafkaCluster
expose convenience methods for managing Kafka topics. Have a look at the TopicManager
interface (Java omitted for brevity).
public interface TopicManager {
void createTopic(TopicConfig config);
void deleteTopic(String topic);
boolean exists(String topic);
Map<Integer, LeaderAndIsr> fetchLeaderAndIsr(String topic);
Properties fetchTopicConfig(String topic);
}
Implementations of the TopicManager
interface currently use the AdminClient
implementation of the Kafka Client library for topic management.
All operations are executed synchronously.
7.1. Creating a topic
kafka.createTopic(TopicConfig.withName("test-topic"));
By default, Kafka for JUnit enables the automatic creation of topics at the broker with defaults that should be sensible for local testing. However, if you find yourself in the situation to create a topic with a specific replication factor or number of partitions that deviate from their default setting, you should create that topic with the respective settings before writing the first Kafka record to it. |
7.2. Deleting a topic
kafka.deleteTopic("test-topic");
Deleting a topic will only set a deletion marker for that topic. The topic may not be deleted immediately after deleteTopic completes.
|
7.3. Determine whether a topic exists
kafka.exists("test-topic");
Returns true even if the topic is marked for deletion.
|
7.4. Retrieving the leader and the In-Sync-Replica Set (ISR)
In case you have multiple brokers running and want to query their assignments and roles for a specific topic, you can use TopicManager#fetchLeaderAndIsr
to retrieve that kind of information. The method returns an unmodifiable java.util.Map
of LeaderAndIsr
instances by their designated partition. The listing underneath shows how to retrieve this information for the topic named test-topic
.
Map<Integer, LeaderAndIsr> leaderAndIsr = kafka.fetchLeaderAndIsr("test-topic");
The type LeaderAndIsr
is not to be confused with the same type in package kafka.api
. The LeaderAndIsr
implementation Kafka for JUnit is a simple transfer object that only contains the ID of the leader node and the IDs of all nodes that comprise the ISR.
7.5. Retrieving the topic configuration remotely
Looking up the topic configuration by accessing the cluster is easily done using the TopicManager
.
Properties topicConfig = kafka.fetchTopicConfig("test-topic");
8. License
This work is released under the terms of the Apache 2.0 license.