Jak włożyć wartości z Listenera do Fluxa?

0

Załóżmy, że mam jakiś tam Listener do ściągania wartości z kolejki. Przykładowa metoda:

 @JmsListener(destination = "mailbox", containerFactory = "myFactory")
    public void receiveMessage(Email email) {
        System.out.println("Received <" + email + ">");
    }

Czyli jeden ściągnięty element = jedno wywołanie metody. I to co potrzebuję zrobić, to wkładać te zdjęte wartości do Fluxa, aby po zasubskrybowaniu się do niego otrzymywać na bieżąco wartości ściągane z kolejki. Chciałem to zrobić jakoś w tym stylu:

Własny interfejs:

public interface CustomListener {

    @JmsListener(destination = "test")
    void receiveMessage(String email);
}

Oraz jego implementacja:

Flux.create(sink -> {
            new CustomListener() {
                @Override
                public void receiveMessage(String email) {
                    sink.next(email);
                }
            };
        });

Z tym, że niestety ta metoda niczego mi nie przechwytuje, nie wiem co robię źle. Próbowałem jeszcze na inny sposób korzystając z java.jms.MessageListener:

        Flux.<String>create(sink -> {
            new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if (message instanceof TextMessage) {
                        try {
                            String text = ((TextMessage) message).getText();
                            sink.next(text);
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            };
        });

Z tym, że tutaj wymagana jest konfiguracja w ten sposób:

  @Bean
  public MessageListenerContainer listenerContainer() {
      DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
      container.setMessageListener(**nazwa klasy implementującej interfejs**);
      return container;
  }

Jak wtedy przekazać nazwę klasy implementującą interfejs do beana konfiguracyjnego skoro MessageListener implementuję anonimowo wewnątrz Flux.create()? Proszę o jakąkolwiek pomoc.

0

@jarekr000000: Wrzucanie white(true) do Flux.create to nie jest antypattern?

0

W sumie zamiast pisać możesz użyć spring integration. IntegrationFlows
Jak tutaj:
https://stackoverflow.com/questions/43126775/how-to-create-a-spring-reactor-flux-from-a-activemq-queue

0

Hmm to nie wiem jak bez while(true) zrobić tamtym sposobem. Co do linku z SO oczywiście próbowałem i udało się połowicznie, tzn. ten Publisher:

  @Bean
    public Publisher<Message<String>> jmsReactiveSource() {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(this.connectionFactory)
                        .destination("testQueue"))
                .channel(MessageChannels.queue())
                .log(LoggingHandler.Level.DEBUG)
                .log()
                .toReactivePublisher();
    }

Automatycznie mi się subskrybował gdy coś trafiało na kolejkę i je z niej ściągał . Jak próbowałem zrobić z niego Fluxa, żeby wykonać w nim jakieś operacje na odebranych danych, czyli tak jak ten gościu ma:

Flux.from(jmsReactiveSource())
                .map(Message::getPayload);

To u mnie ten Flux był pusty ;/ W sensie ani razu nie wyemitował onNext. A wiem, że ten jmsReactiveSource() dane z kolejki w sobie ma, bo wszystkie które odebrał loguje do konsoli.

0
mikee728 napisał(a):
Flux.from(jmsReactiveSource())
                .map(Message::getPayload);

To u mnie ten Flux był pusty ;/ W sensie ani razu nie wyemitował onNext. A wiem, że ten jmsReactiveSource() dane z kolejki w sobie ma, bo wszystkie które odebrał loguje do konsoli.

Jeśli napisałeś jak wyżej to nie dziwne. Ten kod nic nie robi. Standardowy problem reactive nr 1.
Strumień trzeba odebrać.
To sie nie powinno kompilować, ale niesteyty java/kotlin/scala obsysają. Nawet w haskellu można to zrypać.

1

Nie no, aż tak uwsteczniony nie jestem. Subskrybowałem printując do konsoli, zwracałem przez kontroler i za każdym razem Flux był pusty. Dane były jedynie w tym Publisherze.

0

Jak zrobisz dokładnie w tym miejscu tylko jmsReactiveSource() to działa? Jak? Dziwne.

0

Tak, dokładnie. Jeżeli jest obecna linijka .log() to wypisze odebrane do konsoli, jeżeli jej nie ma to tylko zdejmie z kolejki. Jeżeli usunę adnotację @Bean znad tego jmsReactiveSource(), to wtedy nic się nie dzieje i w ogóle nie odbiera tych danych.

0

A tam gdzie tego flux tworzysz to jesteś w innym @Bean - metoda ma adnotację? i jest odpowiednio wywołana? Jako bean. (Jakobin :-) )

0

Flux tworzyłem w ten sposób:

    @GetMapping(value = "/events", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<String> getJms() {
        return Flux.from(tenMojPublisher()).map(v -> v.getPayload());
    }

Lub w ten sposób:

    @GetMapping(value = "/events", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<String> getJms() {
        Flux.from(tenMojPublisher()).map(v -> v.getPayload()).subscribe(message -> System.out.println(message));
        return Flux.empty();
    }

I zero jakichkolwiek wartości. Za to Publisher na pewno ma dane odebrane z kolejki i jest subskrybowany, na dowód wkleję fragment z konsoli:

2019-05-22 09:44:00.366 INFO 12736 --- [erContainer#0-1] o.s.integration.handler.LoggingHandler : GenericMessage [payload=hello queue world, headers={jms_redelivered=false, jms_destination=queue://test, id=94cfa42d-9bc5-a30c-1651-fa70b2a25549, priority=4, jms_timestamp=1558511040363, jms_messageId=ID:L4WW315-55592-1558511023962-5:93:1:1:1, timestamp=1558511040366}]

2

Myślę, że możesz użyć procesorów:

DirectProcessor<Message> processor = DirectProcessor.create(); 
FluxSink<Message> sink = processor.sink(FluxSink.OverflowStrategy.BUFFER);
// ...
consumer.setMessageListener(sink::next); // consumer - JMS MessageConsumer
// ...
processor
    .doOnNext(it -> { /* ... */ })
    .subscribe();

W Twoim projekcie pewnie ten sink musisz zdefiniować jako bean który zostanie wstrzygnięty do MessageListenerContainer.
To jakiego procesora użyć dokładnie i overflow strategy to już musisz sobie doczytać.

1 użytkowników online, w tym zalogowanych: 0, gości: 1