NATS Module
Note
This module is INCUBATING. While it is ready for use and operational in the current version of Testcontainers, it is possible that it may receive breaking changes in the future. See our contributing guidelines for more information on our incubating modules policy.
NATS is a simple, secure and high performance open source messaging system for cloud native applications, IoT messaging, and microservices architectures.
Example
Create a NatsContainer to use it in your tests:
package org.testcontainers.nats;
import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamManagement;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import org.junit.jupiter.api.Test;
import org.testcontainers.utility.DockerImageName;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
class NatsContainerTest {
private static final DockerImageName NATS_IMAGE = DockerImageName.parse("nats:2.12.1");
@Test
void shouldStartNATSContainer() {
try (NatsContainer natsContainer = new NatsContainer(NATS_IMAGE)) {
natsContainer.start();
assertThat(natsContainer.getClientPort()).isNotNull();
assertThat(natsContainer.getRoutingPort()).isNotNull();
assertThat(natsContainer.getHttpMonitoringPort()).isNotNull();
assertThat(natsContainer.getConnectionUrl())
.isEqualTo(String.format("nats://%s:%d", natsContainer.getHost(), natsContainer.getClientPort()));
assertThat(natsContainer.getHttpMonitoringUrl())
.isEqualTo(
String.format("http://%s:%d", natsContainer.getHost(), natsContainer.getHttpMonitoringPort())
);
}
}
@Test
void shouldPublishAndSubscribeMessages() throws IOException, InterruptedException, TimeoutException {
try (NatsContainer natsContainer = new NatsContainer(NATS_IMAGE)) {
natsContainer.start();
String subject = "test-subject";
String message = "Hello NATS!";
Options options = new Options.Builder().server(natsContainer.getConnectionUrl()).build();
try (Connection nc = Nats.connect(options)) {
// Subscribe
io.nats.client.Subscription subscription = nc.subscribe(subject);
nc.flush(Duration.ofSeconds(1));
// Publish
nc.publish(subject, message.getBytes(StandardCharsets.UTF_8));
nc.flush(Duration.ofSeconds(1));
// Receive
Message msg = subscription.nextMessage(Duration.ofSeconds(5));
assertThat(msg).isNotNull();
assertThat(new String(msg.getData(), StandardCharsets.UTF_8)).isEqualTo(message);
}
}
}
@Test
void shouldSupportJetStream() throws Exception {
try (NatsContainer natsContainer = new NatsContainer(NATS_IMAGE).withJetStream()) {
natsContainer.start();
Options options = new Options.Builder().server(natsContainer.getConnectionUrl()).build();
try (Connection nc = Nats.connect(options)) {
JetStreamManagement jsm = nc.jetStreamManagement();
// Create a stream
StreamConfiguration streamConfig = StreamConfiguration
.builder()
.name("test-stream")
.subjects("test.>")
.storageType(StorageType.Memory)
.build();
jsm.addStream(streamConfig);
// Get JetStream context
JetStream js = nc.jetStream();
// Publish a message
String subject = "test.foo";
String message = "JetStream test message";
js.publish(subject, message.getBytes(StandardCharsets.UTF_8));
// Subscribe and receive
io.nats.client.JetStreamSubscription subscription = js.subscribe(subject);
Message msg = subscription.nextMessage(Duration.ofSeconds(5));
assertThat(msg).isNotNull();
assertThat(new String(msg.getData(), StandardCharsets.UTF_8)).isEqualTo(message);
msg.ack();
}
}
}
@Test
void shouldSupportAuthentication() throws IOException, InterruptedException, TimeoutException {
String username = "testuser";
String password = "testpassword";
try (NatsContainer natsContainer = new NatsContainer(NATS_IMAGE).withAuth(username, password)) {
natsContainer.start();
Options options = new Options.Builder()
.server(natsContainer.getConnectionUrl())
.userInfo(username, password)
.build();
try (Connection nc = Nats.connect(options)) {
assertThat(nc.getStatus()).isEqualTo(Connection.Status.CONNECTED);
// Test basic pub/sub
String subject = "auth-test";
String message = "Authenticated message";
io.nats.client.Subscription subscription = nc.subscribe(subject);
nc.flush(Duration.ofSeconds(1));
nc.publish(subject, message.getBytes(StandardCharsets.UTF_8));
nc.flush(Duration.ofSeconds(1));
Message msg = subscription.nextMessage(Duration.ofSeconds(5));
assertThat(msg).isNotNull();
assertThat(new String(msg.getData(), StandardCharsets.UTF_8)).isEqualTo(message);
}
}
}
@Test
void shouldExposeCorrectPorts() {
try (NatsContainer natsContainer = new NatsContainer(NATS_IMAGE)) {
natsContainer.start();
assertThat(natsContainer.getExposedPorts()).contains(4222, 6222, 8222);
assertThat(natsContainer.getLivenessCheckPortNumbers())
.containsExactlyInAnyOrder(
natsContainer.getMappedPort(4222),
natsContainer.getMappedPort(6222),
natsContainer.getMappedPort(8222)
);
}
}
}
Now your tests can connect to NATS using the connection URL:
try (NatsContainer natsContainer = new NatsContainer(NATS_IMAGE)) {
natsContainer.start();
String subject = "test-subject";
String message = "Hello NATS!";
Options options = new Options.Builder().server(natsContainer.getConnectionUrl()).build();
try (Connection nc = Nats.connect(options)) {
// Subscribe
io.nats.client.Subscription subscription = nc.subscribe(subject);
nc.flush(Duration.ofSeconds(1));
// Publish
nc.publish(subject, message.getBytes(StandardCharsets.UTF_8));
nc.flush(Duration.ofSeconds(1));
// Receive
Message msg = subscription.nextMessage(Duration.ofSeconds(5));
assertThat(msg).isNotNull();
assertThat(new String(msg.getData(), StandardCharsets.UTF_8)).isEqualTo(message);
}
}
Options
Using JetStream
JetStream is NATS' built-in distributed persistence system. You can enable it easily:
try (NatsContainer natsContainer = new NatsContainer(NATS_IMAGE).withJetStream()) {
natsContainer.start();
Options options = new Options.Builder().server(natsContainer.getConnectionUrl()).build();
try (Connection nc = Nats.connect(options)) {
JetStreamManagement jsm = nc.jetStreamManagement();
// Create a stream
StreamConfiguration streamConfig = StreamConfiguration
.builder()
.name("test-stream")
.subjects("test.>")
.storageType(StorageType.Memory)
.build();
jsm.addStream(streamConfig);
// Get JetStream context
JetStream js = nc.jetStream();
// Publish a message
String subject = "test.foo";
String message = "JetStream test message";
js.publish(subject, message.getBytes(StandardCharsets.UTF_8));
// Subscribe and receive
io.nats.client.JetStreamSubscription subscription = js.subscribe(subject);
Message msg = subscription.nextMessage(Duration.ofSeconds(5));
assertThat(msg).isNotNull();
assertThat(new String(msg.getData(), StandardCharsets.UTF_8)).isEqualTo(message);
msg.ack();
}
}
Using Authentication
You can configure NATS to require username and password authentication:
String username = "testuser";
String password = "testpassword";
try (NatsContainer natsContainer = new NatsContainer(NATS_IMAGE).withAuth(username, password)) {
natsContainer.start();
Options options = new Options.Builder()
.server(natsContainer.getConnectionUrl())
.userInfo(username, password)
.build();
try (Connection nc = Nats.connect(options)) {
assertThat(nc.getStatus()).isEqualTo(Connection.Status.CONNECTED);
// Test basic pub/sub
String subject = "auth-test";
String message = "Authenticated message";
io.nats.client.Subscription subscription = nc.subscribe(subject);
nc.flush(Duration.ofSeconds(1));
nc.publish(subject, message.getBytes(StandardCharsets.UTF_8));
nc.flush(Duration.ofSeconds(1));
Message msg = subscription.nextMessage(Duration.ofSeconds(5));
assertThat(msg).isNotNull();
assertThat(new String(msg.getData(), StandardCharsets.UTF_8)).isEqualTo(message);
}
}
Enabling Debug/Trace Logging
For debugging purposes, you can enable verbose logging:
NatsContainer nats = new NatsContainer(DockerImageName.parse("nats:2.10"))
.withDebug() // Enable debug logging
.withProtocolTracing(); // Enable protocol tracing
Accessing Monitoring
NATS provides an HTTP monitoring endpoint that you can access:
String httpMonitoringUrl = natsContainer.getHttpMonitoringUrl();
Adding this module to your project dependencies
Add the following dependency to your pom.xml/build.gradle file:
testImplementation "org.testcontainers:testcontainers-nats:2.0.2"
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-nats</artifactId>
<version>2.0.2</version>
<scope>test</scope>
</dependency>