Wednesday, July 29, 2020

Publishing Application Events using Kafka and Spring Transaction Events

Many applications have a need to publish application level events based on some operations happening in the system.  Here is one requirement that I had recently. Basically in my tutorial system, if there is new user created, I need to publish a UserCreated event. Here is how we can accomplish this using Transaction Events provided by Springframework.
First I create a event pojo that will be used to publish the event.
package in.springframework.blog.tutorials.events;
import lombok.Builder;
import lombok.Data;
@Builder
@Data
public class UserCreatedEvent {
private long userId;
private String username;
private String fullname;
private String status;
}

Now we create an event listener.
package in.springframework.blog.tutorials.listeners;
import in.springframework.blog.tutorials.events.UserCreatedEvent;
import in.springframework.blog.tutorials.utils.MyConstants;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Component
@Log4j2
public class UserCreatedEventListener {
@Autowired
@Qualifier(MyConstants.FIRST_TOPIC_TEMPLATE_NAME)
private KafkaTemplate firstKafkaTemplate;
@TransactionalEventListener(phase= TransactionPhase.BEFORE_COMMIT)
void handleBeforeCommit(UserCreatedEvent userCreatedEvent) {
log.error("Before Commit " + userCreatedEvent);
sendMessage(userCreatedEvent, "BeforeCommit");
}
@TransactionalEventListener(phase= TransactionPhase.AFTER_COMMIT)
void handleAfterCommit(UserCreatedEvent userCreatedEvent) {
log.error("After Commit " + userCreatedEvent);
sendMessage(userCreatedEvent, "AfterCommit");
}
@TransactionalEventListener(phase= TransactionPhase.AFTER_ROLLBACK)
void handleAfterRollback(UserCreatedEvent userCreatedEvent) {
log.error("After Rollback " + userCreatedEvent);
sendMessage(userCreatedEvent, "AfterRollback");
}
@TransactionalEventListener(phase= TransactionPhase.AFTER_COMPLETION)
void handleAfterCompletion(UserCreatedEvent userCreatedEvent) {
log.error("After Completion " + userCreatedEvent);
sendMessage(userCreatedEvent, "AfterCompletion");
}
private void sendMessage(UserCreatedEvent userCreatedEvent, String status) {
userCreatedEvent.setStatus(status);
firstKafkaTemplate.send(MyConstants.FIRST_TOPIC, userCreatedEvent.toString()).addCallback(new ListenableFutureCallback() {
@Override
public void onFailure(Throwable throwable) {
log.info(String.format("Message '%s' failed!", userCreatedEvent.toString()));
}
@Override
public void onSuccess(Object o) {
log.info(String.format("Message '%s' sent successfully!", userCreatedEvent.toString()));
}
});
}
}

As you can see within the event listener, I have a handler method that is a hook to a particular phase in transaction commit cycle. At different phases of commit cycle, we are publishing an event to Kafka topic 'FirstTopic'

Now we create an event publisher.
package in.springframework.blog.tutorials.publishers;
import in.springframework.blog.tutorials.entities.User;
import in.springframework.blog.tutorials.events.UserCreatedEvent;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class UserEventPublisher {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void publishUserCreatedEvent(final User user) {
log.info(String.format("Publishing event for user %s with id %d", user.getUsername(), user.getId()));
applicationEventPublisher.publishEvent(UserCreatedEvent.builder()
.userId(user.getId())
.username(user.getUsername())
.fullname(user.getFullname())
.build()
);
}
}
The event publisher just constructs the event object and publishes the event.
We add a createUser method within the user service and mark it @Transactional. This is important to make sure any exceptions coming out of createUser transaction are caught and propagated appropriately.
package in.springframework.blog.tutorials.services;
import in.springframework.blog.tutorials.entities.Tenant;
import in.springframework.blog.tutorials.entities.User;
import in.springframework.blog.tutorials.mappers.UserPojoMapper;
import in.springframework.blog.tutorials.pojos.Role;
import in.springframework.blog.tutorials.pojos.UserPojo;
import in.springframework.blog.tutorials.publishers.UserEventPublisher;
import in.springframework.blog.tutorials.repositories.TenantRepository;
import in.springframework.blog.tutorials.repositories.UserRepository;
import in.springframework.blog.tutorials.utils.TutorialRequestContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Optional;
@Service
public class UserService {
@Autowired
private UserRepository userRepository;
@Autowired
private TenantRepository tenantRepository;
@Autowired
private PasswordEncoder passwordEncoder;
@Autowired
private UserPojoMapper userToPojoMapper;
@Autowired
private UserEventPublisher userEventPublisher;
public Iterable<User> findAll() {
return userRepository.findAll();
}
public Optional<User> findByUsername(String username) {
return userRepository.findUserByTenantAndUsername(TutorialRequestContext.currentTenant.get(), username);
}
public Optional<User> retrieveUser(String idOrUserNameOrEmail) {
Optional<Tenant> optionalTenant = tenantRepository.findById(TutorialRequestContext.currentTenant.get().getId());
try {
Long id = Long.parseLong(idOrUserNameOrEmail);
Optional<User> optionalUser = userRepository.findById(id);
if (optionalUser.isPresent()) {
return optionalUser;
}
}
catch(NumberFormatException e) {
}
Optional<User> optionalUser = userRepository.findUserByEmail(idOrUserNameOrEmail);
if (optionalUser.isPresent()) {
return optionalUser;
}
optionalUser = userRepository.findUserByUsernameAndTenant(optionalTenant.get(),
idOrUserNameOrEmail);
return optionalUser;
}
public User save(User user) {
return userRepository.save(user);
}
public void delete(Long id) {
userRepository.deleteById(id);
}
@Transactional
public Optional<UserPojo> createUser(UserPojo userPojo) {
User user = userToPojoMapper.convert(userPojo);
user.setMask(Role.USER.getMask());
user.setTenant(TutorialRequestContext.currentTenant.get());
user.setPassword(passwordEncoder.encode(userPojo.getPassword()));
User storedUser = save(user);
storedUser.setPassword(null);
userEventPublisher.publishUserCreatedEvent(storedUser);
return Optional.of(userToPojoMapper.convert(storedUser));
}
}

 
Now within the user registration, we add a publish within a transaction. Please be careful that the code block that is publishing the event is within a transaction otherwise these don't perform any operations. 
package in.springframework.blog.tutorials.endpoints;
import in.springframework.blog.tutorials.entities.TutorialClientDetails;
import in.springframework.blog.tutorials.exceptions.EntityAlreadyExistsException;
import in.springframework.blog.tutorials.pojos.UserPojo;
import in.springframework.blog.tutorials.services.ClientService;
import in.springframework.blog.tutorials.services.UserService;
import in.springframework.blog.tutorials.utils.MyConstants;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.http.MediaType;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.websocket.server.PathParam;
import java.util.Optional;
@Log4j2
@RestController
@RequestMapping(MyConstants.REGISTRATION_ENDPOINT)
public class RegistrationEndpoint {
@Autowired
private UserService userService;
@Autowired
private ClientService clientService;
@RequestMapping(value = "/user", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
public Optional<UserPojo> createUser(@PathParam(value="tenant") String tenant,
@RequestBody UserPojo userPojo) {
try {
return userService.createUser(userPojo);
}
catch(DataIntegrityViolationException divx) {
throw new EntityAlreadyExistsException(String.format("User %s or email %s already exists", userPojo.getUsername(), userPojo.getEmail()));
}
}
@RequestMapping(value = "/client", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
@PreAuthorize("hasAnyAuthority('ADMIN', 'TENANT_ADMIN')")
public Optional<TutorialClientDetails> getClient(@RequestBody TutorialClientDetails clientDetails) {
return clientService.create(clientDetails);
}
}
Look at the createUser method that is implementing /user request mapping and method POST. The penultimate line in the method is publishing the event. Now let's test this code.

Just to test the code, we also run a kafka-console-consumer listening to the topic to which our code is publishing the message.
First we try to create a user that is already existing.

$ curl --request POST   --url http://localhost:8081/tenant1/registration/user   \
 --header 'authorization: Basic c3VwZXJzZWNyZXRjbGllbnQ6c3VwZXJzZWNyZXRjbGllbnQxMjM='   \
 --header 'cache-control: no-cache'   \
 --header 'content-type: application/json'   \
 --header 'postman-token: 71180383-2b1f-482e-2fcc-2a23d045b205'   \
 --header 'x-tenant: tenant1'   \
 --data '{"username": "admin8","password": "admin1234","audience" : "self"}' | python -m json.tool
 {
    "error": "Conflict",
    "message": "",
    "path": "/tenant1/registration/user",
    "status": 409,
    "timestamp": 1596015557641
}

Now we try to create a user that doesn't exist.
$ curl --request POST   --url http://localhost:8081/tenant1/registration/user \
--header 'authorization: Basic c3VwZXJzZWNyZXRjbGllbnQ6c3VwZXJzZWNyZXRjbGllbnQxMjM='\
--header 'cache-control: no-cache' \
--header 'content-type: application/json'\
--header 'postman-token: 71180383-2b1f-482e-2fcc-2a23d045b205' \
--header 'x-tenant: tenant1' \
--data '{"username": "admin9","password": "admin1234","audience" : "self"}' | python -m json.tool
{
    "createdAt": 1596015670301,
    "createdBy": "UnAuthenticated",
    "id": 21,
    "mask": 1,
    "tenantId": 1,
    "updatedAt": 1596015670301,
    "updatedBy": "UnAuthenticated",
    "username": "admin9"
}
We can also check our Kafka consumer that we ran earlier.
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic FirstTopic
UserCreatedEvent(userId=20, username=admin8, fullname=null, status=BeforeCommit)
UserCreatedEvent(userId=20, username=admin8, fullname=null, status=AfterRollback)
UserCreatedEvent(userId=20, username=admin8, fullname=null, status=AfterCompletion)
UserCreatedEvent(userId=21, username=admin9, fullname=null, status=BeforeCommit)
UserCreatedEvent(userId=21, username=admin9, fullname=null, status=AfterCommit)
UserCreatedEvent(userId=21, username=admin9, fullname=null, status=AfterCompletion)
The first three events are for the failed transaction which rolled back and the last three are for the successful transaction.