WebFlux, WebSocket i wysyłanie wiadomości do konkretnego usera/userów

0

Cześć,

czy wie ktoś może jak w WebFluxie wysłać wiadomość do konkretnej grupy userów przy pomocy websocketów? Chodzi mi o np. aplikację typu czat z wieloma pokojami. Każdy użytkownik może wejść do wielu pokojów, ale wysłana wiadomość w danym pokoju trafia tylko do znajdujących się w nim użytkowników.
W Springu na servletach mogę użyć @EnableWebSocketMessageBroker, SimpMessageSendingOperations i np @MessageMapping("chat/{roomId}/sendMessage") w controllerze, ale WebFlux nie obsługuje stomp i nie wiem jak to rozwiązać.

Robił ktoś z Was coś podobnego?

0

Tak, skleiłem z tego taki kod:


import io.vavr.control.Try;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import reactor.core.publisher.FluxSink;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
@Component
public class GreetingsPublisher implements Consumer<FluxSink<String>> {
    private static final Logger log = LoggerFactory.getLogger(GreetingsPublisher.class);
    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
    private final Executor executor = Executors.newSingleThreadExecutor();
    public boolean push(String greeting) {
        return queue.offer(greeting);
    }
    @Override
    public void accept(FluxSink<String> sink) {
        this.executor.execute(() -> {
            while (true) {
                Try.of(() -> {
                    final String greeting = queue.take();
                    return sink.next(greeting);
                })
                        .onFailure(ex -> log.error("Could not take greeting from queue", ex));
            }
        });
    }


@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {
    private final GreetingsService greetingsService = new GreetingsService();
    private final GreetingsPublisher greetingsPublisher;
    private final Flux<String> publisher;
    public ReactiveWebSocketHandler(GreetingsPublisher greetingsPublisher) {
        this.greetingsPublisher = greetingsPublisher;
        this.publisher = Flux.create(greetingsPublisher).share();
    }
    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {
        webSocketSession
                .receive()
                .map(webSocketMessage -> webSocketMessage.getPayloadAsText())
                .map(helloMessage -> greetingsService.greeting(helloMessage))
                .doOnNext(greetings -> greetingsPublisher.push(greetings))
                .subscribe();
        final Flux<WebSocketMessage> message = publisher
                .map(greetings -> webSocketSession.textMessage(greetings));
        return webSocketSession.send(message);
    }
}

W ReactiveWebSocketHandler mam this.publisher = Flux.create(greetingsPublisher).share();. Kiedy dostaję message przez WebSocket, to pcham ją na kolejkę w GreetingsPublisher, z której pobieram przez sink.next(greeting) i emituję we Fluxa... W ten sposób wszyscy subskrybenci dostają dany message - każdy połączony przez WebSocket. Nie mam natomiast pojęcia jak zrobić, żeby wysłać message do konkretnej grupy użytkowników?

0

Niedawno się tym bawiłem i zrobiłem czata p2p. Proponuję na początek wyrzucić WebFluksa i zrobić to na WebMVC. Jak zadziała i ogarniesz, co Spring konfiguruje pod spodem (jest sporo konwencji nazewniczych np. topiców, prawdopodobnie będziesz musiał zejść głębiej), powrót na reaktywny stack nie powinno być problemem. Ewentualnie wrzuć koda na githuba, chętnie pomogę.

1

Chwilowo nie mam sensownej klawiatury więc napisze ogólnie. Zamiast subscribe wpisz flatMap. W jego srodku wyciagnij usera ( z websocketa lub wiadomosci). Wyciagnij pokoje. Potem wyciagnij z publishera wiadomosci, filteruj po pokojach i to zwroc jako wynik flatMap. (No i słowko return trzeba przeniesc).

0

Tak jak @jarekr000000 pisze, musisz zwinąć wszystkie instrukcje w jeden pipeline. Nawet IntelliJ podświetla subscribe(): "Calling subscribe() in non-blocking scope". Postaram się wkleić kod.

Update. Kod, który wkleiłeś można zapisać jak poniżej. Nie ma jeszcze obsługi "pokojów" :)

 @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {
        return webSocketSession.send(
                webSocketSession.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .map(greetingsService::greeting)
        );
    }
0

Niestety, nadal tego nie ogarniam.

Mi IntelliJ nie podświetla subscribe(), ale ja używam Community, więc może jest mniej intelli ;) W każdym razie Twój kod @Charles_Ray nie zadziała poprawnie, bo wyśle wiadmość tylko do samego siebie. Mi chodzi o to, żeby dwie sesje websocketów mogły wymieniać wiadomości między sobą. I po to cała ta zabawa z publisherem -> mamy Fluxa, który dostępny jest dla wszystkich websocketowych sesji, dajemy na nim .share() po to, aby wiadomość była dostępna dla wszystkich consumerów (inaczej jedna sesja by skonsumowała i druga nie miałaby prawa jej widzieć).

Próbowałem to ogarnąć z .flatMap tak jak radziliście, ale mi to nie działa poprawnie. Messege idą w nieprzewidywalny sposób, raz jedna, raz wiele na raz, raz w ogóle. Do tego jak rozłączymy wszystkie sesje WebSocket i połączymy jeszcze raz to wiadomości w ogóle nie dochodzą.

        return webSocketSession.send(
                webSocketSession.receive()
                        .map(webSocketMessage -> webSocketMessage.getPayloadAsText())
                        .map(helloMessage -> greetingsService.greeting(helloMessage))
                        .map(greetings -> greetingsPublisher.push(greetings))
//                        .doOnNext(greetings -> greetingsPublisher.push(greetings))
                        .flatMap(greetings -> publisher
                                .map(publisherGreetings -> webSocketSession.textMessage(publisherGreetings)))
        );

Działa mi to tylko jeśli na webSocketSession.receive() dam subscribe().
Kod: https://github.com/kkojot/multiroomchat-spring-webflux-vue/tree/master/spring-backend/src/main/java/com/kojodev/blog/multiroomchat/webfluxmultiroomchat/greetings
Ja testuję to przy pomocy Apic Chrome extension https://chrome.google.com/webstore/detail/apic-complete-api-solutio/ggnhohnkfcpcanfekomdkjffnfcjnjam. Robię dwa połączenia na ws://localhost:8080/greetings i pcham message {"name":"kojot"}

Co do drugiego zadania, filtrowania wiadomości do konkretnych userów próbwałem zrobić tak jak @jarekr000000 pisał, ale to nie działa mi w ogóle. Wiadomości idą do wszystkich sesji.

    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {
        /* share greetings across websocket sessions */
        webSocketSession
                .receive()
                .map(webSocketMessage -> webSocketMessage.getPayloadAsText())
                .map(message -> new SimpleTeamMessageResolver(webSocketSession, message))
                .map(resolver -> resolver.process())
                .map(message -> teamMessagePublisher.push(message))
                .subscribe();

        return webSocketSession.send(
                SimpleTeamMessageResolver
                        .filterTeamMessage(webSocketSession, publisher)
                        .map(message -> webSocketSession.textMessage(message)));
    }
public class SimpleTeamMessageResolver {

    private final WebSocketSession webSocketSession;
    private final String message;

    public SimpleTeamMessageResolver(WebSocketSession webSocketSession, String message) {
        this.webSocketSession = webSocketSession;
        this.message = message;
    }

    public static Flux<String> filterTeamMessage(WebSocketSession session, Flux<String> messages) {
        final String teamColor = teamColorFromSession(session);
        if (teamColor.equals("UNKNOWN")) return messages;
        final Flux<String> filtered = messages
                .filter(message -> message.startsWith(teamColor));
        return filtered;
    }

    private static String teamColorFromSession(WebSocketSession session) {
        return Option.of(session
                .getAttributes()
                .get("team"))
                .map(color -> color.toString())
                .getOrElse("UNKNOWN");
    }

    public String process() {
        if (message.startsWith("join:")) {
            return handleJoin();
        } else {
            return handleMessage();
        }
    }

    private String handleJoin() {
        final String teamColor = teamColorFromMessage();
        webSocketSession
                .getAttributes()
                .put("team", teamColor);
        return "New guy joined the " + teamColor + " team.";
    }

    private String handleMessage() {
        final String teamColor = teamColorFromSession(webSocketSession);
        return teamColor + ": " + message;
    }

    public String teamColorFromMessage() {
        return Option.of(message.split(":"))
                .map(strings -> strings.length > 1 ? strings[1] : "BLUE")
                .map(color -> color.toUpperCase().trim().equals("RED") ? "RED" : "BLUE")
                .getOrElse("BLUE");
    }
}

Testowałem tak jak wyżej greetings, endpoint ws://localhost:8080/team. Messege typu join: BLUE i join: RED przypisują nas do teamu, później każdy jeden String chciałbym aby szedł tylko do sesji w danym teamie. Próbowałem to debugować, ale ogólnie to w metodę handle() czy process() wchodzę tylko po nawiązaniu połączenia z WebSocket, a nie za każdym messegem.

Kod: https://github.com/kkojot/multiroomchat-spring-webflux-vue/tree/master/spring-backend/src/main/java/com/kojodev/blog/multiroomchat/webfluxmultiroomchat/team

Kończą mi się pomysły, jeśli Wy jakieś macie to śmiało piszcie ;)

0

Filtrowanie Fluxa jak wyżej, nawet jakby zadziałało, prowadziłoby do niezłego spaghetti code w miarę rozwoju aplikacji. Na szczęście najnowsze WebFluxy wchodzą z supportem RSocket.
Patrząć na przykład w dokumentacji https://docs.spring.io/spring-boot/docs/2.2.0.M6/reference/html/spring-boot-features.html#boot-features-rsocket to jest właśnie to, co potrzebuję. Niestety, o ile nawiązanie połączenia klient-serwer z WebFlux -> WebFlux nie ma problemu, o tyle JavaScript -> WebFlux nie udało mi się spiąć. Problemem leży w route i metadata (w rsocket-js).
Tutaj robili workaround https://github.com/bclozel/spring-flights/issues/5, ale poczekam aż to poprawią i może dodadzą security. Póki co API jest dość niestabilne i ciężko w to brnąć.

Z tego powodu odpuszczam WebFluxa i programowanie reaktywne. Za dużo problemów, za mało korzyści.

1 użytkowników online, w tym zalogowanych: 0, gości: 1