Czy spotkał się ktoś z takim problemem podczas korzystania z Atomikosa?
com.atomikos.jms.internal.AtomikosTransactionRequiredJMSException: The JMS session you are using requires a JTA transaction context for the calling thread and none was found.
Please correct your code to do one of the following:
- start a JTA transaction if you want your JMS operations to be subject to JTA commit/rollback, or
- create a non-transacted session and do session acknowledgment yourself, or
- set localTransactionMode to true so connection-level commit/rollback are enabled.
Kod jaki mam wygląda następująco:
Konfiguracja connectora do kolejki mq ibm oraz transaction managera
package ibm.connect.ibmconnection.mq.listener;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.ibm.mq.jakarta.jms.MQXAConnectionFactory;
import com.ibm.mq.spring.boot.MQConfigurationProperties;
import com.ibm.mq.spring.boot.MQConnectionFactoryCustomizer;
import com.ibm.mq.spring.boot.MQConnectionFactoryFactory;
import ibm.connect.ibmconnection.mq.MqProperties;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Session;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;
import java.util.List;
@Configuration
@EnableTransactionManagement
public class ConnectionFactoryConfiguration {
public static final String CUSTOM_FACTORY = "CUSTOM_FACTORY";
public static final String CUSTOM_CONTAINER = "CUSTOM_CONTAINER";
@Bean(CUSTOM_CONTAINER)
JmsListenerContainerFactory<?> container(@Qualifier(CUSTOM_FACTORY) ConnectionFactory mqConnectionFactory,
PlatformTransactionManager transactionManager) {
DefaultJmsListenerContainerFactory container = new DefaultJmsListenerContainerFactory();
container.setConnectionFactory(mqConnectionFactory);
container.setTransactionManager(transactionManager);
container.setSessionTransacted(true);
container.setSessionAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE);
return container;
}
@SneakyThrows
@Bean(CUSTOM_FACTORY)
ConnectionFactory connectionFactory(MqProperties properties, List<MQConnectionFactoryCustomizer> factoryCustomizers) {
MQXAConnectionFactory connectionFactory = new MQConnectionFactoryFactory(create(properties), factoryCustomizers).createConnectionFactory(
MQXAConnectionFactory.class
);
connectionFactory.setPort(properties.getPort());
connectionFactory.setHostName(properties.getHost());
var atomikos = new AtomikosConnectionFactoryBean();
atomikos.setXaConnectionFactory(connectionFactory);
atomikos.setUniqueResourceName("custom-atomikos-unique-resource-name");
atomikos.setPoolSize(5);
atomikos.setLocalTransactionMode(false);
return atomikos;
}
@Bean
@Primary
public PlatformTransactionManager transactionManager(UserTransactionManager atomikosTransactionManager, UserTransactionImp atomikosUserTransaction) throws Throwable {
JtaTransactionManager manager = new JtaTransactionManager();
manager.setTransactionManager(atomikosTransactionManager);
manager.setUserTransaction(atomikosUserTransaction);
manager.setAllowCustomIsolationLevels(true);
return manager;
}
@Bean(initMethod = "init", destroyMethod = "close")
public UserTransactionManager userTransactionManager() {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
userTransactionManager.setStartupTransactionService(true);
return userTransactionManager;
}
@Bean
public UserTransactionImp atomikosUserTransaction() throws Throwable {
return new UserTransactionImp();
}
private MQConfigurationProperties create(MqProperties properties) {
MQConfigurationProperties props = new MQConfigurationProperties();
props.setPassword(properties.getPassword());
props.setUser(properties.getUser());
props.setChannel(properties.getChannel());
props.setQueueManager(properties.getQueueManager());
return props;
}
}
Konfiguracja listenera:
package ibm.connect.ibmconnection.mq.api;
import ibm.connect.ibmconnection.db.MessageService;
import jakarta.jms.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import static ibm.connect.ibmconnection.mq.listener.ConnectionFactoryConfiguration.CUSTOM_CONTAINER;
import static ibm.connect.ibmconnection.mq.listener.ConnectionFactoryConfiguration.CUSTOM_FACTORY;
@Component
public class MqApiImpl implements MqApi {
private final MessageService service;
private final JmsTemplate jmsTemplate;
public MqApiImpl(MessageService service, @Qualifier(CUSTOM_FACTORY) ConnectionFactory mqConnectionFactory) {
this.jmsTemplate = new JmsTemplate(mqConnectionFactory);
this.service = service;
}
@Override
public void send(String destination, String message) {
jmsTemplate.convertAndSend(destination, message);
}
@JmsListener(destination = "${connector.queue}", containerFactory = CUSTOM_CONTAINER)
@Override
public void receive(String content) {
service.receive(content);
}
}
Bez atomikosa (AtomikosConnectionFactoryBean) oraz transactionManagera (UserTransactionManager), jak oprę connector tylko o interfejs XAConnectionFactory to wszystko działa - test robię tak, że strzelam endpointem na konkretny kanał i z niego nasłuchuję.
Po dodaniu atomikosa jest komunikat który dałem na samej górze.
MessageService jedyne co robi to odpala metodę w której jest zapis do bazy stringa który przychodzi z kolejki.
Wielkie dzięki jeśli ktoś się pochyli nad tym problemem ze mną.