testcontainers-extensions

Testcontainers Extensions Kafka

Minimum required Java version Maven Central GitHub Action Coverage Maintainability Rating Lines of Code

Testcontainers Kafka Extension with advanced testing capabilities.

Features:

Dependency :rocket:

Gradle

testImplementation "io.goodforgod:testcontainers-extensions-kafka:0.12.1"

Maven

<dependency>
    <groupId>io.goodforgod</groupId>
    <artifactId>testcontainers-extensions-kafka</artifactId>
    <version>0.12.1</version>
    <scope>test</scope>
</dependency>

Kafka Client

Kafka Client must be on classpath, if it is somehow not on your classpath already, don’t forget to add:

Gradle

testRuntimeOnly "org.apache.kafka:kafka-clients:3.5.1"

Maven

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.1</version>
    <scope>test</scope>
</dependency>

Content

Usage

Test with container start in PER_RUN mode and topic reset per method will look like:

@TestcontainersKafka(mode = ContainerMode.PER_RUN,
        topics = @Topics(value = "my-topic-name", reset = Topics.Mode.PER_METHOD))
class ExampleTests {

  @ConnectionKafka 
  private KafkaConnection connection;
  
  @Test
  void test() {
    var consumer = connection.subscribe("my-topic-name");
    connection.send("my-topic-name", Event.ofValue("value1"), Event.ofValue("value2"));
    consumer.assertReceivedAtLeast(2, Duration.ofSeconds(5));
  }
}

Connection

KafkaConnection is an abstraction with asserting data in database container and easily manipulate container connection settings. You can inject connection via @ConnectionKafka as field or method argument or manually create it from container or manual settings.

KafkaConnection allow you to create consumers and send messages to Kafka for easier testing and asserting.

class ExampleTests {

  private static final KafkaContainer container = new KafkaContainer();

  @Test
  void test() {
      container.start();
      KafkaConnection connection = KafkaConnection.forContainer(container);

      var consumer = connection.subscribe("my-topic-name");
      connection.send("my-topic-name", Event.ofValue("value1"), Event.ofValue("value2"));
      consumer.assertReceivedAtLeast(2, Duration.ofSeconds(5));
  }
}

Annotation

@TestcontainersKafka - allow automatically start container with specified image in different modes without the need to configure it.

Available containers modes:

Simple example on how to start container per class, no need to configure container:

@TestcontainersKafka(mode = ContainerMode.PER_CLASS)
class ExampleTests {

    @Test
    void test() {
        // test
    }
}

That’s all you need.

It is possible to customize image with annotation image parameter.

Image also can be provided from environment variable:

@TestcontainersKafka(image = "${MY_IMAGE_ENV|confluentinc/cp-kafka:7.7.1}")
class ExampleTests {

    @Test
    void test() {
        // test
    }
}

Image syntax:

Manual Container

When you need to manually configure container with specific options, you can provide such container as instance that will be used by @TestcontainersKafka, this can be done using @ContainerKafka annotation for container.

Example:

@TestcontainersKafka(mode = ContainerMode.PER_CLASS)
class ExampleTests {

    @ContainerKafka
    private static final KafkaContainer container = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.7.1"));

    @Test
    void checkParams(@ConnectionKafka KafkaConnection connection) {
        assertNotNull(connection.params().boostrapServers());
        assertNotNull(connection.params().properties());
    }
}

Network

In case you want to enable Network.SHARED for containers you can do this using network & shared parameter in annotation:

@TestcontainersKafka(network = @Network(shared = true))
class ExampleTests {

    @Test
    void test() {
        // test
    }
}

Default alias will be created by default, even if nothing was specified (depends on implementation).

You can provide also custom alias for container. Alias can be extracted from environment variable also or default value can be provided if environment is missing.

In case specified environment variable is missing default alias will be created:

@TestcontainersKafka(network = @Network(alias = "${MY_ALIAS_ENV|my_default_alias}"))
class ExampleTests {

    @Test
    void test() {
        // test
    }
}

Image syntax:

Annotation Topics

It is possible configure topics for creation right after Kafka container started (or before test class started if ContainerMode is PER_RUN), such topics will be created if not exist. This can be useful in tests before tested application started and connected to Kafka, especially with Consumers.

Example:

@TestcontainersKafka(mode = ContainerMode.PER_CLASS, topics = @Topics("my-topic"))
class ExampleTests {

}

It is also possible to reset topics if needed per test class or per test method:

@TestcontainersKafka(mode = ContainerMode.PER_CLASS, 
                     topics = @Topics(value = "my-topic", reset = Topics.Mode.PER_METHOD))
class ExampleTests {

}

Annotation Connection

KafkaConnection - can be injected to field or method parameter and used to communicate with running container via @ConnectionKafka annotation. KafkaConnection provides kafka properties, ability to send events to kafka or consume events from kafka for easier testing.

Annotation Properties

It is possible to provide custom properties to @KafkaConnection that will be applied to Produces and Consumers that are created during tests.

@TestcontainersKafka(mode = ContainerMode.PER_CLASS, image = "confluentinc/cp-kafka:7.7.1")
class ExampleTests {

    @ConnectionKafka(properties = {"enable.auto.commit", "true"})
    private KafkaConnection connection;
}

External Connection

In case you want to use some external Kafka instance that is running in CI or other place for tests (due to docker limitations or other), you can use special environment variables and extension will use them to propagate connection and no Kafka containers will be running in such case.

Special environment variables:

Prefix EXTERNAL_TEST_KAFKA_ conversion rules: Cut prefix and lower case and replace _ with .. Example if found env EXTERNAL_TEST_KAFKA_AUTO_OFFSET_RESET it will be converted to auto.offset.reset.

Producer

You can easily send events to any topic (if topic not exist before sending, it will be automatically created).

Example:

@TestcontainersKafka(mode = ContainerMode.PER_CLASS, image = "confluentinc/cp-kafka:7.7.1")
class ExampleTests {

    @ConnectionKafka
    private KafkaConnection connection;

    @Test
    void test() {
        connection.send("my-topic-name", Event.ofValue("{\"name\":\"User\"}"));
    }
}

Consumer

You can easily subscribe and consume events from any topic (if topic not exist before subscribing, it will be automatically created).

Example:

@TestcontainersKafka(mode = ContainerMode.PER_CLASS, image = "confluentinc/cp-kafka:7.7.1")
class ExampleTests {

    @ConnectionKafka
    private KafkaConnection connection;

    @Test
    void test() {
        // given
        var consumer = connection.subscribe("my-topic-name");
        
        // when
        connection.send("my-topic-name", Event.ofValue("value1"), Event.ofValue("value2"));
        
        // then
        consumer.assertReceivedAtLeast(2, Duration.ofSeconds(5));
    }
}

License

This project licensed under the Apache License 2.0 - see the LICENSE file for details.