Integrating spring-security with spring-kafka

8:02 PM 0 Comments

It's not uncommon for a message on a bus to have a user as part of its metadata. In my particular example at my workplace, we had the following very simple use case:

A user creates a task in our system (not unlike a task in, say, Todoist), and the creation of that task is written to a Kafka topic for propagation to other systems. One service consumes this topic to translate each message into an Elasticsearch record. That service finds it useful to know which user in our application created the task.

To achieve this, we at least need to have user information in the message. And it would be nice for the platform to take care of this concern for us.

In our case, the currently logged in user is available through the Spring Security API, so ideally, we'd configure Spring Kafka to read the user from and write the user to the Spring Security SecurityContext with producing and consuming messages.

Spring Kafka makes this simple.

Augmenting Kafka Messages with the Logged In User

First, we need to add the logged in user for each Kafka message. The way we did this was by extending MessagingMessageConverter:

public SpringSecurityAwareMessagingMessageConverter
    extends MessagingMessageConverter {
  protected Object convertPayload(Message message) {
    String payload = (String)super.convertPayload(message);
    Authentication auth =
    if ( auth != null && auth.isAuthenticated() ) {
      return new AsUser(payload, auth);
    } else {
      return new AsUser(payload, null);

This converter wraps the payload on the way out in an envelope that contains the user as pulled from the Security Context. While we probably don't want to simply throw the entire Authentication object in our message, I've done it here just to keep the code simple.

Setting Up a Security Context Based on each Kafka Message

Second, we need to unwrap the message. We can do this also in the same message converter, placing it in the Security Context:

protected Object extractAndConvertValue
    (ConsumerRecord record, Type type) {
    Object value = super.extractAndConvertValue(record, type);
    if ( value instanceof AsUser ) {

      UsernamePasswordAuthenticationToken token =
        new UsernamePasswordAuthenticationToken
          (((AsUser)value).getUser(), null, new ArrayList<>());


      return ((AsUser)value).getMessage();
    return value;

We also do some cleanup once the method invocation is completed in case the same thread is used to process another message:

public SpringSecurityAwareMessageHandlerFactory
    extends DefaultMessageHandlerFactory {
  public InvocableHandlerMethod
    createInvocableHandlerMethod(Object bean, Method method) {
    InvocableHandlerMethod m =
      new InvocableHandlerMethod(bean, method) {
        public Object invoke
          (Message message, Object... providedArgs)
          throws Exception {
          try {
            return super.invoke(message, providedArgs);
          } finally {

    HandlerMethodArgumentResolverComposite handlers =
      new HandlerMethodArgumentResolverComposite();



    return m;

The nice thing about this approach is that these can be used to abstract away the transport of the user. Also, consumers can reuse or otherwise exercise code that uses the SecurityContextHolder to derive who the user is.

Never Trust the Client

Of course, there are problems with this approach. Hypothetically, anyone with access to the Kafka cluster can write messages to this topic and claim a user. If this is a concern, then we can use a claim-based approach to transmit, say, a signed JWT as the user, which, depending on the needs of the consumer, can be used to validate against the issuer. This will definitely slow down processing, so you'll have to weigh the benefits.

I've posted the code as an example in my Github repo. Enjoy!

Josh Cummings

"I love to teach, as a painter loves to paint, as a singer loves to sing, as a musician loves to play" - William Lyon Phelps