-
[실시간 채팅 서비스#3] WebSocket + STOMP + Pub/Sub으로 실시간 채팅 구현개발 일지 2024. 10. 7. 20:42
WebSocket
WebSocket은 HTTP와 같은 통신 규약이다.
단방향 연결을 지원하는 HTTP와 호환되어, 실시간 양방향 연결을 지원한다.
실시간으로 데이터 교환이 빠르게 이루어지는 서비스에서 주로 사용된다.WebSocket만으로도 저수준의 채팅 서비스를 개발하는 것은 가능하다.
하지만 메시징 패턴(Pub/Sub)이나 고수준 핸들링을 구현할 클래스는 부재하다.
간단히 말하면 채널이 단 한 개만 존재하며, 서버가 세부적인 핸들링을 구현하기 어렵다.
STOMP
STOMP는 위와 같은 WebSocket의 부족한 점을 보충해 주는 메세지 브로커이다.
클라이언트에게 진입점을 제공하여 채널을 구독(SubScribe)하거나 그 채널에 메시지를 발행(Publish)하는 과정을 관리하고, 발행된 메시지를 구독자들에게 전달하는 역할을 수행한다.
또한 STOMP 명령어(CONNECT
,SUBSCRIBE
,SEND
,DISCONNECT
등)를 사용하여 메시지 전송, 구독, 연결 관리 등을 수행한다.@Component @Slf4j public class StompHandler implements ChannelInterceptor { @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); if (StompCommand.CONNECT.equals(accessor.getCommand())) { // 클라이언트가 WebSocket을 통해 연결할 때 처리 로직 } else if (StompCommand.SUBSCRIBE.equals(accessor.getCommand())) { // 클라이언트가 채팅방에 입장할 때 처리 로직 } else if (StompCommand.SEND.equals(accessor.getCommand())) { // 클라이언트가 메시지를 보낼 때 처리 로직 } else if (StompCommand.UNSUBSCRIBE.equals(accessor.getCommand())) { // 클라이언트가 채팅방에 퇴장할 때 처리 로직 } return ChannelInterceptor.super.preSend(message, channel); } }
@Configuration @EnableWebSocketMessageBroker @RequiredArgsConstructor public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { private final StompHandler stompHandler; @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/sub"); registry.setApplicationDestinationPrefixes("/pub"); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws/chat").setAllowedOriginPatterns("*").withSockJS(); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors(stompHandler); } }
Pub/Sub
앞서 STOMP를 통해 Pub/Sub 패턴을 구현할 수 있다고 설명했다.
하지만 STOMP는 애플리케이션 내부에서 동작한다.
때문에 분산 서버 환경에서 서버 간의 상태를 공유할 수 없다.또한 STOMP는 메시지에 대한 내구성을 보장하지 않기 때문에, 메시지를 유실하거나 전송에 실패한다면 다시 복구하는 것이 불가능하다.
위와 같은 단점을 보충하는 것 외에도 서버 부하 감소, 비동기 처리, 이벤트 기반 아키텍처 등 다양한 이점을 누리기 위해 별도의 메세지 브로커를 사용하여 Pub/Sub 패턴을 구현한다.
그 중에서도 단순한 구조를 가진 Redis를 사용해 Pub/Sub을 구현해 보았다.@Configuration public class RedisConfig { @Value("${spring.data.redis.host}") private String redisHost; @Value("${spring.data.redis.port}") private int redisPort; @Bean public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() { return new LettuceConnectionFactory(redisHost, redisPort); } @Bean public ReactiveRedisTemplate<String, Object> reactiveRedisTemplate( ReactiveRedisConnectionFactory factory) { Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class); RedisSerializationContext.RedisSerializationContextBuilder<String, Object> builder = RedisSerializationContext.newSerializationContext(new StringRedisSerializer()); RedisSerializationContext<String, Object> context = builder.value(serializer).build(); return new ReactiveRedisTemplate<>(factory, context); } @Bean public ReactiveRedisMessageListenerContainer redisMessageListenerContainer( ReactiveRedisConnectionFactory connectionFactory) { return new ReactiveRedisMessageListenerContainer(connectionFactory); } @Bean public MessageListenerAdapter messageListenerAdapter(ChatSubscriber subscriber) { return new MessageListenerAdapter(subscriber, "sendMessage"); } }
위 코드에선 대부분 Reactive 객체들을 사용하고 있지만, 일반 객체여도 구성에는 변함이 없다.
주목할 점은 메세지를 수신하는 MessageListenerAdapter와 그 집합을 관리하는 MessageListenerContainer이다.위 프로젝트에선 클라이언트가 동적으로 채팅방을 생성하는 기능을 제공하기 때문에, 설정 파일에선 MessageListenerContainer에 MessageListenerAdapter를 설정하지 않았다.
만약 정적으로 채널이 존재한다면, redisMessageListenerContainer 메서드에서 정적으로 messageListenerAdapter 설정을 해도 좋다.@Component @RequiredArgsConstructor @Slf4j public class RedisTopicManager { public static final String chatRoomTopic = "chatroom:"; private final ReactiveRedisMessageListenerContainer listenerContainer; private final MessageListenerAdapter messageListenerAdapter; // 해당 채널을 구독하는 메서드 public void subscribeToTopic(String chatRoomId) { ChannelTopic topic = new ChannelTopic(chatRoomTopic + chatRoomId); listenerContainer .receive(topic) .map(message -> (String) message.getMessage()) .doOnSubscribe(s -> log.info("Subscribed to topic: " + topic.getTopic())) .doOnNext( message -> { log.info("Received message from Redis: " + message); // 메시지를 ChatSubscriber에 전달 messageListenerAdapter.onMessage( new DefaultMessage( topic.getTopic().getBytes(), message.getBytes()), null); }) .doOnError(e -> log.error("Error while receiving Redis message", e)) .doOnComplete( () -> log.info( "Completed receiving message from Redis: " + topic.getTopic())) .subscribe(); // 구독 시작 } public Mono<ChannelTopic> getTopicForChatRoom(String chatRoomId) { return Mono.just(new ChannelTopic(chatRoomTopic + chatRoomId)); } }
동적으로 채널을 생성 한다면 위와 같은 TopicManager 클래스를 구현하는 방법이 있다.
클라이언트가 채팅방 입장 API에 성공 후 이벤트를 발행하여 해당 채널에 대한 구독을 설정하기 위해 ReactiveRedisMessageListenerContainer에 MessageListenerAdapter를 등록한다.
이를 통해 해당 채널로 발행된 메세지를 구독자가 실시간으로 수신할 수 있게 된다.Reactive가 아닌 RedisMessageListenerContainer라면 구현은 더욱 간단하다.
// 구독 추가 public void subscribeToChannel(String chatRoomId) { ChannelTopic topic = new ChannelTopic("chatroom:" + chatRoomId); redisMessageListenerContainer.addMessageListener(messageListener, topic); } // 구독 해제 public void unsubscribeFromChannel(String chatRoomId) { ChannelTopic topic = new ChannelTopic("chatroom:" + chatRoomId); redisMessageListenerContainer.removeMessageListener(messageListener, topic); }
ReactiveRedisMessageListenerContainer의 경우 구독 해제를 구현하려면 Disposable 객체를 애플리케이션이 직접 관리해야 하여 분산 서버에서의 구현이 불가한 것으로 이해하고 있다.
내가 모르는 방법으로 ReactiveRedisMessageListenerContainer의 구독 해제를 구현하는 방법이 존재할 지도 모르지만,일단은 사용되지 않는 채널을 비활성화 시키기 위해 RedisMessageListenerContainer 사용을 선택했다.
@Component @RequiredArgsConstructor @Slf4j public class ChatPublisherImpl implements ChatPublisher { private final ReactiveRedisTemplate<String, Object> redisTemplate; private final RedisTopicManager topicManager; @Override public void publish(final ChatMessageDto message) { // roomId에 대한 채널 토픽 조회 ChannelTopic topic = topicManager.getTopicForChatRoom(message.getRoomId()).block(); if (topic != null) { // 채널에 메세지 발행 redisTemplate.convertAndSend(topic.getTopic(), message).subscribe(); } else { log.warn("Topic for chat room {} dies not exist!", message.getRoomId()); } } }
@Component @RequiredArgsConstructor @Slf4j public class ChatSubscriberImpl implements ChatSubscriber { private final ObjectMapper om; private final SimpMessageSendingOperations messageTemplate; @Override public void sendMessage(final String message) { try { ChatMessageDto chatMessageDto = om.readValue(message, ChatMessageDto.class); messageTemplate.convertAndSend( "/sub/chat/room/" + chatMessageDto.getRoomId(), chatMessageDto); } catch (Exception e) { log.error(e.getMessage()); throw new MessageSendException(); } } }
메세지를 수신 받은 RedisMessageListenerContainer는 내부의 MessageListenerAdapter로 메세지를 전달하며, MessageListenerAdapter는 내부에 등록한ChatSubscriberImpl의 sendMessage 메서드를 실행한다.
SimpMessageSendingOperations를 통해 STOMP 거쳐 특정 경로를 구독한 클라이언트에게 메시지가 전송된다.'개발 일지' 카테고리의 다른 글
[실시간 채팅 서비스#5] Kafka vs RabbitMQ 성능 비교 - 실시간 WebSocket 메시징 테스트 (0) 2025.04.17 [실시간 채팅 서비스#4] API-Gateway (0) 2024.10.17 [실시간 채팅 서비스#0] 타임라인 (0) 2024.09.20 [실시간 채팅 서비스#2] 설계 (1) 2024.09.01 [실시간 채팅 서비스#1] 개인 프로젝트 시작 (0) 2024.08.21