WebSockets, Pub/Sub Operations and Real-Time Communication

Table of Contents
banner image for a blog on WebSockets and real-time communication.

Websocket

WebSockets are a communication protocol that enables a full-duplex (two-way) communication between the client and the server.

Once a connection is established between these, they can start communicating.

Key features of Websocket

  • It supports full-duplex communication between client and server.
  • It will keep connections alive between these, and is also called long pooling.
  • We can also configure it as Event Driven architecture.

Websockets and Real-Time Communication

Websockets are crucial for enabling seamless, two-way communication between client and server, particularly in applications requiring real-time interaction. In the context of language translation, websockets power systems like our real-time audio translator, where live audio streams are processed and translated instantly. This low-latency connection ensures that users experience uninterrupted, real-time language translation, making websockets an essential component for delivering smooth, global communication solutions.

Pub/Sub Model Explanation

Pub/Sub model is the model that server will broadcast to a specific topic and client needs to be subscribed on that topic, so that broadcasted data will be received at client side.

If the client will subscribe after broadcast of the data from server then client will not be able to receive the broadcasted data, So it will need to subscribe before broadcasting by server.

In this approach only the server will send messages to the clients, clients cannot send the messages to the server.

Websocket: Key Components of Pub/Sub model

  • Publisher: Sender which publishes the messages.
  • Subscriber: Receiver which receives the published messages.
  • Topic: In which topic data will be published by Publisher and on that topic data will be received by Subscriber.

Code visualisation (Configuration of Websocket as Pub/Sub model)

				
					@EnableWebSocketMessageBroker
@Configuration
public class WebSocketMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {

     @Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
}

     @Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket")
.setAllowedOriginPatterns("*")
.withSockJS();
}

}

				
			

Inject dependency of below mentioned class to ready for broadcasting

				
					@Autowired
private SimpMessagingTemplate simpMessagingTemplate; 

				
			

Now we can broadcast using below method -

				
					simpMessagingTemplate.convertAndSend(“/topic/user”, user);

				
			

Real Time communication between client and server

In this architecture the server will be configured as real time communication between client and server. Once a connection is established between client and server then both can send messages at a time.In the real world examples of this approach like chat application, online status tick etc.

We can also implement the interceptors to intercept the websocket requests like validating authorization and based on that proceed/aborting the requests.

Code visualisation (Websocket for real time configuration)

				
					
	
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers (WebSocketHandlerRegistry registry) {
registry.addHandler(new MyWebSocketHandler(), "/websocket")
.setAllowedOriginPatterns("*");
}	
}


				
			

Message Handler for client messages

In the handler handled messages like chatting application -> Assuming one client has multiple sessions.

				
					@Log4j2
@Component
public class MyWebSocketHandler extends TextWebSocketHandler {
    /**
     * assuming one userId has multiple sessions
     */
    private final Map<String, List<WebSocketSession>> sessionsCache = new ConcurrentHashMap<>();
    private final ObjectMapper objectMapper = new ObjectMapper();

    /**
     * Add session into sessionsMapper when a session established
     * @param session
     * @throws Exception
     */
    @Override
    public void afterConnectionEstablished (WebSocketSession session) throws Exception {
        final String username = extractUsernameFromSession(session);
        sessionsCache.computeIfAbsent(username, k -> new CopyOnWriteArrayList<>());
        sessionsCache.get(username).add(session);
        log.info("Connection established {}", sessionsCache.get(username));
    }


 /**
     * Extract username from session
     * @param webSocketSession
     * @return
     */
    private String extractUsernameFromSession (final WebSocketSession webSocketSession) {
        final MultiValueMap<String, String> queryParams = UriComponentsBuilder
                .fromUri(Objects.requireNonNull(webSocketSession.getUri()))
                .build()
                .getQueryParams();
        return queryParams.getFirst("username");
    }


/**
     * Sending message to a user on all active sessions
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    protected void handleTextMessage (WebSocketSession session, TextMessage message)
            throws Exception {
        final MessageRequestDto messageRequestDto = objectMapper.readValue(message.getPayload(),
                MessageRequestDto.class);
        if (sessionsCache.get(messageRequestDto.getToUser()) == null) {
            log.error("Message sending failed because user does not connected {}", messageRequestDto.getToUser());
            return;
        }
        final var messageJson = objectMapper.writeValueAsString(new MessageResponseDto(extractUsernameFromSession(session), messageRequestDto.getMessage(), messageRequestDto.getTime()));
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        sessionsCache.get(messageRequestDto.getToUser())
                .forEach(webSocketSession ->
                {
                    try {
                        webSocketSession.sendMessage(new TextMessage(messageJson));
                        atomicBoolean.set(true);
                    } catch (IOException e) {
                        log.error("Error in sending messages {}", e);
                    }
                });
        if (atomicBoolean.get()) {
            log.info("Message {} sent to all active sessions of user {}", messageRequestDto.getMessage(), messageRequestDto.getToUser());
        }
    }

    /**
     * remove session from sessiosMapper when a session disconnect
     * @param session
     * @param status
     * @throws Exception
     */
    @Override
    public void afterConnectionClosed (WebSocketSession session, CloseStatus status)
            throws Exception {
        final String username = extractUsernameFromSession(session);
        if (sessionsCache.get(username).remove(session)) {
            log.warn("Connection closed {}", session);
        }
    }


}

				
			

Conclusion

In this blog, I have talked about Websocket and defined how clients and servers communicate with each other simultaneously using broadcasting and real-time connectivity. The server will know how many clients are connected in real-time using this scenario.

We also get to know the connectivity status of the server and client so that they can communicate with each other.

Share this blog

What do you think?

Contact Us Today for
Inquiries & Assistance

We are happy to answer your queries, propose solution to your technology requirements & help your organization navigate its next.

Your benefits:
What happens next?
1
We’ll promptly review your inquiry and respond
2
Our team will guide you through solutions
3

We will share you the proposal & kick off post your approval

Schedule a Free Consultation

Related articles