First I create a event pojo that will be used to publish the event.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package in.springframework.blog.tutorials.events; | |
import lombok.Builder; | |
import lombok.Data; | |
@Builder | |
@Data | |
public class UserCreatedEvent { | |
private long userId; | |
private String username; | |
private String fullname; | |
private String status; | |
} |
Now we create an event listener.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package in.springframework.blog.tutorials.listeners; | |
import in.springframework.blog.tutorials.events.UserCreatedEvent; | |
import in.springframework.blog.tutorials.utils.MyConstants; | |
import lombok.extern.log4j.Log4j2; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Qualifier; | |
import org.springframework.kafka.core.KafkaTemplate; | |
import org.springframework.stereotype.Component; | |
import org.springframework.transaction.event.TransactionPhase; | |
import org.springframework.transaction.event.TransactionalEventListener; | |
import org.springframework.util.concurrent.ListenableFutureCallback; | |
@Component | |
@Log4j2 | |
public class UserCreatedEventListener { | |
@Autowired | |
@Qualifier(MyConstants.FIRST_TOPIC_TEMPLATE_NAME) | |
private KafkaTemplate firstKafkaTemplate; | |
@TransactionalEventListener(phase= TransactionPhase.BEFORE_COMMIT) | |
void handleBeforeCommit(UserCreatedEvent userCreatedEvent) { | |
log.error("Before Commit " + userCreatedEvent); | |
sendMessage(userCreatedEvent, "BeforeCommit"); | |
} | |
@TransactionalEventListener(phase= TransactionPhase.AFTER_COMMIT) | |
void handleAfterCommit(UserCreatedEvent userCreatedEvent) { | |
log.error("After Commit " + userCreatedEvent); | |
sendMessage(userCreatedEvent, "AfterCommit"); | |
} | |
@TransactionalEventListener(phase= TransactionPhase.AFTER_ROLLBACK) | |
void handleAfterRollback(UserCreatedEvent userCreatedEvent) { | |
log.error("After Rollback " + userCreatedEvent); | |
sendMessage(userCreatedEvent, "AfterRollback"); | |
} | |
@TransactionalEventListener(phase= TransactionPhase.AFTER_COMPLETION) | |
void handleAfterCompletion(UserCreatedEvent userCreatedEvent) { | |
log.error("After Completion " + userCreatedEvent); | |
sendMessage(userCreatedEvent, "AfterCompletion"); | |
} | |
private void sendMessage(UserCreatedEvent userCreatedEvent, String status) { | |
userCreatedEvent.setStatus(status); | |
firstKafkaTemplate.send(MyConstants.FIRST_TOPIC, userCreatedEvent.toString()).addCallback(new ListenableFutureCallback() { | |
@Override | |
public void onFailure(Throwable throwable) { | |
log.info(String.format("Message '%s' failed!", userCreatedEvent.toString())); | |
} | |
@Override | |
public void onSuccess(Object o) { | |
log.info(String.format("Message '%s' sent successfully!", userCreatedEvent.toString())); | |
} | |
}); | |
} | |
} |
As you can see within the event listener, I have a handler method that is a hook to a particular phase in transaction commit cycle. At different phases of commit cycle, we are publishing an event to Kafka topic 'FirstTopic'
Now we create an event publisher.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package in.springframework.blog.tutorials.publishers; | |
import in.springframework.blog.tutorials.entities.User; | |
import in.springframework.blog.tutorials.events.UserCreatedEvent; | |
import lombok.extern.log4j.Log4j2; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.context.ApplicationEventPublisher; | |
import org.springframework.stereotype.Component; | |
@Component | |
@Log4j2 | |
public class UserEventPublisher { | |
@Autowired | |
private ApplicationEventPublisher applicationEventPublisher; | |
public void publishUserCreatedEvent(final User user) { | |
log.info(String.format("Publishing event for user %s with id %d", user.getUsername(), user.getId())); | |
applicationEventPublisher.publishEvent(UserCreatedEvent.builder() | |
.userId(user.getId()) | |
.username(user.getUsername()) | |
.fullname(user.getFullname()) | |
.build() | |
); | |
} | |
} |
We add a createUser method within the user service and mark it @Transactional. This is important to make sure any exceptions coming out of createUser transaction are caught and propagated appropriately.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package in.springframework.blog.tutorials.services; | |
import in.springframework.blog.tutorials.entities.Tenant; | |
import in.springframework.blog.tutorials.entities.User; | |
import in.springframework.blog.tutorials.mappers.UserPojoMapper; | |
import in.springframework.blog.tutorials.pojos.Role; | |
import in.springframework.blog.tutorials.pojos.UserPojo; | |
import in.springframework.blog.tutorials.publishers.UserEventPublisher; | |
import in.springframework.blog.tutorials.repositories.TenantRepository; | |
import in.springframework.blog.tutorials.repositories.UserRepository; | |
import in.springframework.blog.tutorials.utils.TutorialRequestContext; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.security.crypto.password.PasswordEncoder; | |
import org.springframework.stereotype.Service; | |
import org.springframework.transaction.annotation.Transactional; | |
import java.util.Optional; | |
@Service | |
public class UserService { | |
@Autowired | |
private UserRepository userRepository; | |
@Autowired | |
private TenantRepository tenantRepository; | |
@Autowired | |
private PasswordEncoder passwordEncoder; | |
@Autowired | |
private UserPojoMapper userToPojoMapper; | |
@Autowired | |
private UserEventPublisher userEventPublisher; | |
public Iterable<User> findAll() { | |
return userRepository.findAll(); | |
} | |
public Optional<User> findByUsername(String username) { | |
return userRepository.findUserByTenantAndUsername(TutorialRequestContext.currentTenant.get(), username); | |
} | |
public Optional<User> retrieveUser(String idOrUserNameOrEmail) { | |
Optional<Tenant> optionalTenant = tenantRepository.findById(TutorialRequestContext.currentTenant.get().getId()); | |
try { | |
Long id = Long.parseLong(idOrUserNameOrEmail); | |
Optional<User> optionalUser = userRepository.findById(id); | |
if (optionalUser.isPresent()) { | |
return optionalUser; | |
} | |
} | |
catch(NumberFormatException e) { | |
} | |
Optional<User> optionalUser = userRepository.findUserByEmail(idOrUserNameOrEmail); | |
if (optionalUser.isPresent()) { | |
return optionalUser; | |
} | |
optionalUser = userRepository.findUserByUsernameAndTenant(optionalTenant.get(), | |
idOrUserNameOrEmail); | |
return optionalUser; | |
} | |
public User save(User user) { | |
return userRepository.save(user); | |
} | |
public void delete(Long id) { | |
userRepository.deleteById(id); | |
} | |
@Transactional | |
public Optional<UserPojo> createUser(UserPojo userPojo) { | |
User user = userToPojoMapper.convert(userPojo); | |
user.setMask(Role.USER.getMask()); | |
user.setTenant(TutorialRequestContext.currentTenant.get()); | |
user.setPassword(passwordEncoder.encode(userPojo.getPassword())); | |
User storedUser = save(user); | |
storedUser.setPassword(null); | |
userEventPublisher.publishUserCreatedEvent(storedUser); | |
return Optional.of(userToPojoMapper.convert(storedUser)); | |
} | |
} |
Now within the user registration, we add a publish within a transaction. Please be careful that the code block that is publishing the event is within a transaction otherwise these don't perform any operations.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package in.springframework.blog.tutorials.endpoints; | |
import in.springframework.blog.tutorials.entities.TutorialClientDetails; | |
import in.springframework.blog.tutorials.exceptions.EntityAlreadyExistsException; | |
import in.springframework.blog.tutorials.pojos.UserPojo; | |
import in.springframework.blog.tutorials.services.ClientService; | |
import in.springframework.blog.tutorials.services.UserService; | |
import in.springframework.blog.tutorials.utils.MyConstants; | |
import lombok.extern.log4j.Log4j2; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.dao.DataIntegrityViolationException; | |
import org.springframework.http.MediaType; | |
import org.springframework.security.access.prepost.PreAuthorize; | |
import org.springframework.web.bind.annotation.RequestBody; | |
import org.springframework.web.bind.annotation.RequestMapping; | |
import org.springframework.web.bind.annotation.RequestMethod; | |
import org.springframework.web.bind.annotation.RestController; | |
import javax.websocket.server.PathParam; | |
import java.util.Optional; | |
@Log4j2 | |
@RestController | |
@RequestMapping(MyConstants.REGISTRATION_ENDPOINT) | |
public class RegistrationEndpoint { | |
@Autowired | |
private UserService userService; | |
@Autowired | |
private ClientService clientService; | |
@RequestMapping(value = "/user", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE) | |
public Optional<UserPojo> createUser(@PathParam(value="tenant") String tenant, | |
@RequestBody UserPojo userPojo) { | |
try { | |
return userService.createUser(userPojo); | |
} | |
catch(DataIntegrityViolationException divx) { | |
throw new EntityAlreadyExistsException(String.format("User %s or email %s already exists", userPojo.getUsername(), userPojo.getEmail())); | |
} | |
} | |
@RequestMapping(value = "/client", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE) | |
@PreAuthorize("hasAnyAuthority('ADMIN', 'TENANT_ADMIN')") | |
public Optional<TutorialClientDetails> getClient(@RequestBody TutorialClientDetails clientDetails) { | |
return clientService.create(clientDetails); | |
} | |
} |
Just to test the code, we also run a kafka-console-consumer listening to the topic to which our code is publishing the message.
First we try to create a user that is already existing.
$ curl --request POST --url http://localhost:8081/tenant1/registration/user \ --header 'authorization: Basic c3VwZXJzZWNyZXRjbGllbnQ6c3VwZXJzZWNyZXRjbGllbnQxMjM=' \ --header 'cache-control: no-cache' \ --header 'content-type: application/json' \ --header 'postman-token: 71180383-2b1f-482e-2fcc-2a23d045b205' \ --header 'x-tenant: tenant1' \ --data '{"username": "admin8","password": "admin1234","audience" : "self"}' | python -m json.tool { "error": "Conflict", "message": "", "path": "/tenant1/registration/user", "status": 409, "timestamp": 1596015557641 }
Now we try to create a user that doesn't exist.
$ curl --request POST --url http://localhost:8081/tenant1/registration/user \ --header 'authorization: Basic c3VwZXJzZWNyZXRjbGllbnQ6c3VwZXJzZWNyZXRjbGllbnQxMjM='\ --header 'cache-control: no-cache' \ --header 'content-type: application/json'\ --header 'postman-token: 71180383-2b1f-482e-2fcc-2a23d045b205' \ --header 'x-tenant: tenant1' \ --data '{"username": "admin9","password": "admin1234","audience" : "self"}' | python -m json.tool { "createdAt": 1596015670301, "createdBy": "UnAuthenticated", "id": 21, "mask": 1, "tenantId": 1, "updatedAt": 1596015670301, "updatedBy": "UnAuthenticated", "username": "admin9" }
We can also check our Kafka consumer that we ran earlier.
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic FirstTopic UserCreatedEvent(userId=20, username=admin8, fullname=null, status=BeforeCommit) UserCreatedEvent(userId=20, username=admin8, fullname=null, status=AfterRollback) UserCreatedEvent(userId=20, username=admin8, fullname=null, status=AfterCompletion) UserCreatedEvent(userId=21, username=admin9, fullname=null, status=BeforeCommit) UserCreatedEvent(userId=21, username=admin9, fullname=null, status=AfterCommit) UserCreatedEvent(userId=21, username=admin9, fullname=null, status=AfterCompletion)
The first three events are for the failed transaction which rolled back and the last three are for the successful transaction.