Atomikos i transakcje rozproszone

0

Czy spotkał się ktoś z takim problemem podczas korzystania z Atomikosa?

com.atomikos.jms.internal.AtomikosTransactionRequiredJMSException: The JMS session you are using requires a JTA transaction context for the calling thread and none was found.
Please correct your code to do one of the following:

  1. start a JTA transaction if you want your JMS operations to be subject to JTA commit/rollback, or
  2. create a non-transacted session and do session acknowledgment yourself, or
  3. set localTransactionMode to true so connection-level commit/rollback are enabled.

Kod jaki mam wygląda następująco:

Konfiguracja connectora do kolejki mq ibm oraz transaction managera

package ibm.connect.ibmconnection.mq.listener;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.ibm.mq.jakarta.jms.MQXAConnectionFactory;
import com.ibm.mq.spring.boot.MQConfigurationProperties;
import com.ibm.mq.spring.boot.MQConnectionFactoryCustomizer;
import com.ibm.mq.spring.boot.MQConnectionFactoryFactory;
import ibm.connect.ibmconnection.mq.MqProperties;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Session;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

import java.util.List;

@Configuration
@EnableTransactionManagement
public class ConnectionFactoryConfiguration {

    public static final String CUSTOM_FACTORY = "CUSTOM_FACTORY";
    public static final String CUSTOM_CONTAINER = "CUSTOM_CONTAINER";


    @Bean(CUSTOM_CONTAINER)
    JmsListenerContainerFactory<?> container(@Qualifier(CUSTOM_FACTORY) ConnectionFactory mqConnectionFactory,
                                             PlatformTransactionManager transactionManager) {
        DefaultJmsListenerContainerFactory container = new DefaultJmsListenerContainerFactory();
        container.setConnectionFactory(mqConnectionFactory);
        container.setTransactionManager(transactionManager);
        container.setSessionTransacted(true);
        container.setSessionAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE);
        return container;

    }

    @SneakyThrows
    @Bean(CUSTOM_FACTORY)
    ConnectionFactory connectionFactory(MqProperties properties, List<MQConnectionFactoryCustomizer> factoryCustomizers) {
        MQXAConnectionFactory connectionFactory = new MQConnectionFactoryFactory(create(properties), factoryCustomizers).createConnectionFactory(
                MQXAConnectionFactory.class
        );
        connectionFactory.setPort(properties.getPort());
        connectionFactory.setHostName(properties.getHost());

        var atomikos = new AtomikosConnectionFactoryBean();
        atomikos.setXaConnectionFactory(connectionFactory);
        atomikos.setUniqueResourceName("custom-atomikos-unique-resource-name");
        atomikos.setPoolSize(5);
        atomikos.setLocalTransactionMode(false);
        return atomikos;
    }


    @Bean
    @Primary
    public PlatformTransactionManager transactionManager(UserTransactionManager atomikosTransactionManager, UserTransactionImp atomikosUserTransaction) throws Throwable {
        JtaTransactionManager manager = new JtaTransactionManager();
        manager.setTransactionManager(atomikosTransactionManager);
        manager.setUserTransaction(atomikosUserTransaction);
        manager.setAllowCustomIsolationLevels(true);
        return manager;
    }

    @Bean(initMethod = "init", destroyMethod = "close")
    public UserTransactionManager userTransactionManager() {
        UserTransactionManager userTransactionManager = new UserTransactionManager();
        userTransactionManager.setForceShutdown(false);
        userTransactionManager.setStartupTransactionService(true);
        return userTransactionManager;
    }

    @Bean
    public UserTransactionImp atomikosUserTransaction() throws Throwable {
        return new UserTransactionImp();
    }

    private MQConfigurationProperties create(MqProperties properties) {
        MQConfigurationProperties props = new MQConfigurationProperties();
        props.setPassword(properties.getPassword());
        props.setUser(properties.getUser());
        props.setChannel(properties.getChannel());
        props.setQueueManager(properties.getQueueManager());
        return props;
    }


}


Konfiguracja listenera:


package ibm.connect.ibmconnection.mq.api;

import ibm.connect.ibmconnection.db.MessageService;
import jakarta.jms.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import static ibm.connect.ibmconnection.mq.listener.ConnectionFactoryConfiguration.CUSTOM_CONTAINER;
import static ibm.connect.ibmconnection.mq.listener.ConnectionFactoryConfiguration.CUSTOM_FACTORY;

@Component
public class MqApiImpl implements MqApi {

    private final MessageService service;
    private final JmsTemplate jmsTemplate;

    public MqApiImpl(MessageService service, @Qualifier(CUSTOM_FACTORY) ConnectionFactory mqConnectionFactory) {
        this.jmsTemplate = new JmsTemplate(mqConnectionFactory);
        this.service = service;
    }

    @Override

    public void send(String destination, String message) {
        jmsTemplate.convertAndSend(destination, message);
    }

    @JmsListener(destination = "${connector.queue}", containerFactory = CUSTOM_CONTAINER)
    @Override
    public void receive(String content) {
        service.receive(content);
    }
}


Bez atomikosa (AtomikosConnectionFactoryBean) oraz transactionManagera (UserTransactionManager), jak oprę connector tylko o interfejs XAConnectionFactory to wszystko działa - test robię tak, że strzelam endpointem na konkretny kanał i z niego nasłuchuję.
Po dodaniu atomikosa jest komunikat który dałem na samej górze.

MessageService jedyne co robi to odpala metodę w której jest zapis do bazy stringa który przychodzi z kolejki.

Wielkie dzięki jeśli ktoś się pochyli nad tym problemem ze mną.

0

Zapewne dodanie @Transactional przed public void send(String destination, String message) { pomoże.
Z tym, że IMO nie powinieneś się tym bawić.
JMS ze springiem/javaee to prawdziwe jądro ciemności. Tylko sie nabawisz PTSD.

1 użytkowników online, w tym zalogowanych: 0, gości: 1