Załóżmy, że mam jakiś tam Listener do ściągania wartości z kolejki. Przykładowa metoda:
@JmsListener(destination = "mailbox", containerFactory = "myFactory")
public void receiveMessage(Email email) {
System.out.println("Received <" + email + ">");
}
Czyli jeden ściągnięty element = jedno wywołanie metody. I to co potrzebuję zrobić, to wkładać te zdjęte wartości do Fluxa, aby po zasubskrybowaniu się do niego otrzymywać na bieżąco wartości ściągane z kolejki. Chciałem to zrobić jakoś w tym stylu:
Własny interfejs:
public interface CustomListener {
@JmsListener(destination = "test")
void receiveMessage(String email);
}
Oraz jego implementacja:
Flux.create(sink -> {
new CustomListener() {
@Override
public void receiveMessage(String email) {
sink.next(email);
}
};
});
Z tym, że niestety ta metoda niczego mi nie przechwytuje, nie wiem co robię źle. Próbowałem jeszcze na inny sposób korzystając z java.jms.MessageListener
:
Flux.<String>create(sink -> {
new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
String text = ((TextMessage) message).getText();
sink.next(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
};
});
Z tym, że tutaj wymagana jest konfiguracja w ten sposób:
@Bean
public MessageListenerContainer listenerContainer() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setMessageListener(**nazwa klasy implementującej interfejs**);
return container;
}
Jak wtedy przekazać nazwę klasy implementującą interfejs do beana konfiguracyjnego skoro MessageListener
implementuję anonimowo wewnątrz Flux.create()
? Proszę o jakąkolwiek pomoc.