Jaki system kolejkowy

0

Jaki system kolejkowy polecicie?
Jaki najwydajniejszy, niezawodny możliwy do tworzenia kopi kolejek na inne węzły. Dane będą w postaci binarnej. Musi być możliwość osadzenia w aplikacji (bez zewnętrznych narzędzi).
Co myślicie o ActiveMQ?
Jest tyle technologi kolejek na rynku że ciężko zdecydować się na jakieś konkretne rozwiązanie.

0

Hm, jeśli ma być osadzony w aplikacji to ubija to możliwość użycia wielu niezawodnych rozwiązań jak Kafka

0

Jaka skala ruchu? Co to znaczy "najwydajniejszy"?

0

system będzie obrabiał około 20 milionów wiadomości dziennie

2
volau napisał(a):

system będzie obrabiał około 20 milionów wiadomości dziennie

To, że system będzie obrabiał ileś czegoś, to znaczy nic.

  • Batchowo, online?
  • Jaki jest max. czas na obsłużenie wiadomości?
  • Ile tych wiadomości w szczycie będziesz miał na sekundę?
  • Jaki jest max. czas oczekiwania na obsłużenie wiadomości?
  • Wiadomości są trwałe, ulotne?
  • Możesz coś zgubić?
  • Jaki jest rozmiar wiadomości?
2

Jaki najwydajniejszy, niezawodny możliwy do tworzenia kopi kolejek na inne węzły.

Co to znaczy stworzyć kopię kolejki? Kopię metadanych, danych (historycznych/ nieprzetworzonych/ wszystkich), czegoś innego?

2

Czyli masz +/- 250 na sekundę, z taką ilością to sobie każda kolejka powinna poradzić. ActiveMQ tak spokojnie po kilkanaście tysięcy na sekundę da radę, a Kaffka nawet i kilkaset tysięcy na sekundę.

0

W najwyższym chwilowym ruchu może dojść do 2000/s ale to rzadko. Wielkość danych może oscylować od kilku kb do max 5mb (te większe dużo rzadziej). Jeśli ActiveMQ ma taką wydajność to prawdopodobnie padnie wybór na tą technologie.

2

Z tymi większymi rozmiarem wiadomościami (liczonymi w MB) to bym uważał. Jak payload jest większy to często się go wrzuca na jakiś storage, a w wiadomości wrzucanej na kolejkę jest tylko link do payloada na storage, skąd odbiorca wiadmości może sobie go pobrać.

0

Robię próbę z ActiveMQ. Mam uruchomionego brokera jako embedded z persistence na true. Mam około 20 kolejek. I tworzę połączenie i sesje per kolejke. Wysyłam na kolejki BytesMessage. Mam problem w wydjanością a dokłądnie spędza średnio 10-50milisekund na metodzie messageProducer.send(message). Jakies pomysł co może byc powodem?

Klasa obsługująca kolejkę (na razie taka forma bo robimy wydmuszke aby sprawdzic czy jest lepsze od obecnie uzytej technologi (kod jeszcze do refactoringu))

import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.*;

public class ActiveMQQueue {
    private static final Logger log = LoggerFactory.getLogger(ActiveMQQueue.class);
    private Connection connection;
    private Session session;

    private String queueName;
    private Destination destination;
    private MessageProducer messageProducer;
    private MessageConsumer messageConsumer;
    

    public ActiveMQQueue(String queueName) throws JMSException {
            this.queueName = queueName;
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            destination = session.createQueue(queueName);
            messageProducer = session.createProducer(destination);
            messageConsumer = session.createConsumer(destination);
            log.info("CREATED QUEUE FOR: " + queueName);
    }

    private byte[] readBody(BytesMessage message)  throws JMSException {
        if(message != null) {
            byte[] data = new byte[(int) message.getBodyLength()];
            message.readBytes(data);
            return data;
        }

        return null;
    }

    public byte[] peek() throws JMSException {
        long start = System.currentTimeMillis();
        BytesMessage message = (BytesMessage) messageConsumer.receiveNoWait();
        long time =  (System.currentTimeMillis()-start);
        if(time > 5) log.info("PEEK: {}", time);
        return readBody(message);
    }

    public byte[] pop() throws JMSException {
        long start = System.currentTimeMillis();
        BytesMessage message = (BytesMessage) messageConsumer.receiveNoWait();

        if(message != null) {
            message.acknowledge();
        }
        long time =  (System.currentTimeMillis()-start);
        if(time > 5) log.info("POP: {}", time);
        return readBody(message);
    }

    public void push(byte[] data) throws JMSException {

        BytesMessage message = session.createBytesMessage();
        message.writeBytes(data);
        long start = System.currentTimeMillis();
        messageProducer.send(message);
        long time =  (System.currentTimeMillis()-start);
        if(time > 5) log.info("PUSH: {}",time);
    }

    public String getName() {
        return queueName;
    }

    public Destination getDestination() {
        return destination;
    }

    public int size() {
        return 0; //TODO
    }

}

1 użytkowników online, w tym zalogowanych: 0, gości: 1