Propagating MDC Context Across REST Endpoints and Kafka Consumers/Producers
In this post, we will explore how to propagate the MDC (Mapped Diagnostic Context) among REST endpoints using the JAX-RS API standard and Kafka consumers/producers in Quarkus and Spring Boot services.
Motivation
Why should we propagate the MDC context across REST endpoints and Kafka?
There are multiple use cases, but one of the most important is troubleshooting. A common scenario is writing integration tests that involve multiple REST endpoints, where we want to track which actions are triggered by each test. By propagating the MDC context, we gain better observability and debugging capabilities.
If your system already has an observability setup (e.g., using OpenTelemetry), tracing context is automatically propagated across REST and Kafka in most frameworks like Spring Boot and Quarkus. For SmallRye Reactive Messaging in Quarkus, you can check these changes to understand how it works.
Scenario
Let’s introduce the scenario that we’ll use during this post. We’ll have the following components:
- API service: REST + kafka producer service:
@Path("/api")
public class Resource {
@Channel("out")
Emitter<String> emitter;
@POST
public void submitAuthor(@RestPath String payload) {
Log.info("API received message: " + payload);
emitter.send(payload);
}
}
- Worker service: Kafka consumer service:
@ApplicationScoped
public class MessageConsumer {
@Incoming("in")
void consume(String message) {
Log.infov("Received new message from topic with name `{0}`", message);
// do something with the message
}
}
If integration tests are poorly designed, data pollution may occur, causing random test failures. Identifying the root cause becomes difficult without proper tracking. To improve troubleshooting, we need a way to track which test triggered each action.
Creating a test context
To solve this, we introduce a test context, using the test name as a unique identifier. We pass this identifier via an HTTP header (“x-it-test”).
MDC Context
Instead of modifying all log traces, we propagate the “x-it-test” header using the MDC context. To log this header, we configure the logging format:
- For Quarkus:
quarkus.log.console.format=[it-test=%X{x-it-test}] %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p [%c{3.}] (%t) %s%e%n
More information here.
- For Spring Boot, in logback-spring.xml:
<appender name="ConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>[it-test=%X{x-it-test}] %d{ISO8601} [thread=%thread] [%-5p] [%c] %X{user}- %m%n%ex</pattern>
</encoder>
</appender>
JAX WS RS API filters
The test must add the “x-it-test” header when calling the API service. The API service then needs a request filter to store this header in the MDC context:
@Component // this annotation for Spring Boot only!
@Provider
public class MdcLoggingContainerRequestFilter implements ContainerRequestFilter {
@Override
public void filter(ContainerRequestContext requestContext) throws IOException {
String iqeTest = requestContext.getHeaderString("x-it-test");
MDC.put("x-it-test", iqeTest);
}
}
More information about Quarkus here.
Kafka Consumer/Producers interceptors
The implementation varies between frameworks:
For Quarkus
Quarkus uses the Smallrye Messaging Reactive implementation for Kafka, so this is not really about Quarkus, but the Smallrye Messaging framework.
We need the outgoing interceptor to propagate the header from the MDC context to the Kafka message header:
@Default
@ApplicationScoped
public class MdcOutgoingInterceptor implements OutgoingInterceptor {
@Override
public Message<?> beforeMessageSend(Message<?> message) {
String test = MDC.get("x-it-test");
if (!StringUtil.isNullOrEmpty(test) && message instanceof KafkaMessage kafkaMessage) {
kafkaMessage.getHeaders().put("x-it-test", test);
}
return message;
}
@Override
public void onMessageAck(Message<?> message) {}
@Override
public void onMessageNack(Message<?> message, Throwable failure) {}
}
And the incoming interceptor to propagate the header from the Kafka message header to the MDC context back:
@Default
@ApplicationScoped
public class MdcIncomingInterceptor implements IncomingInterceptor {
@Override
public Message<?> afterMessageReceive(Message<?> message) {
if (message instanceof KafkaMessage kafkaMessage) {
MDC.put("x-it-test", kafkaMessage.getHeaders().get("x-it-test"));
}
return message;
}
@Override
public void onMessageAck(Message<?> message) {}
@Override
public void onMessageNack(Message<?> message, Throwable failure) {}
}
Important considerations:
-
Smallrye Messaging Reactive only supports one IncomingInterfactor and one OutgoingInterceptor. If you have multiple implementations, it will use only one, the one with more priority. I’m not sure if this was designed like this for some purpose, but checking the implementation, I don’t see a reason for this strong limitation…
-
The
@Default
annotation is to configure these interceptors for all the channels. An alternative is to use the@Identifier("channel-a")
annotation and the interceptor would only work for the channelchannel-a
.
More information here.
- For Spring Boot
The idea is basically the same, but the interfaces change a bit.
We need the producer interceptor to propagate the header from the MDC context to the Kafka message header:
public class MdcProducerInterceptor implements ProducerInterceptor<Object, Object> {
@Override
public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
String test = MDC.get("x-it-test");
if (!StringUtils.isNullOrEmpty(test)) {
record.headers().add("x-it-test", test.getBytes(StandardCharsets.UTF_8));
}
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
And configure the producer to use this interceptor:
@NotNull
public static Map<String, Object> getProducerProperties(KafkaProperties kafkaProperties) {
Map<String, Object> properties = kafkaProperties.buildProducerProperties(null);
// ...
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MdcProducerInterceptor.class.getName());
return properties;
}
Also, we need to record interceptor to propagate the header from the Kafka message header to the MDC context back:
public class MdcRecordInterceptor<K, V> implements RecordInterceptor<K, V> {
@Override
public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
var header = record.headers().lastHeader("x-it-test");
if (header != null) {
MDC.put("x-it-test", new String(header.value()));
}
return record;
}
}
And configure the consumer factory to use this interceptor:
@Bean
ConcurrentKafkaListenerContainerFactory<String, String>kafkaContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
// ...
factory.setRecordInterceptor(new IqeTestMdcRecordInterceptor<>());
return factory;
}
Conclusion
By propagating the MDC context across REST endpoints and Kafka consumers/producers, we enhance observability and troubleshooting. This approach enables us to track integration test executions and debug issues more efficiently. Whether using Quarkus or Spring Boot, implementing this mechanism ensures a consistent tracing context across distributed services.