Wysyłanie komunikatów JMS w transakcji JTA

0

Próbuję sobie transakcje dla komunikatów JMS. Mam stworzoną swoją kolejkę MyQueue na serverze (JBoss 5.1). Chcę w transakcji z użyciem UserTransaction wysyłać do niej komunikaty aby potem (lub w trakcie) odbierać je przez drugiego klienta przez "receive()" (połączenie zdalne). Ten schemat bez transakcji wszystko działa jak powinno. Ale po wprowadzeniu transakcji działa dokładnie tak samo, jakby transakcja nie działała. Nawet jak wprowadzę na koniec transakcji polecenie rollback u klienta wysyłającego to i tak klient odbierający otrzymuje te message z kolejki. Według dokumentacji JMS transakcja powinna wstrzymywać rzeczywiste wysyłanie komunikatów od klienta do czasu wykonania commit(). Ja mając odpalonego klienta odbierajacego widze że odbiera on komunikaty z kolejki nawet przez wykonaniem commit() przez klienta wysyłającego. Jaki robię błąd w tej transakcji? (status transakcji wskazuje że działa trans.getStatus() )
Czy rodzaj obiektu Connection ma znacznie ? Ja pobierałem z serwera do klienta wysyłającego zarówno standardowe ConnectionFactory jak i XAConnectionFactory aby stworzyć Connection ale Transakcja dla nich działa jak opisałem powyżej. Może musze stworzyć własną transakcyjną ConnectionFactory ?
Czy to może być wina domyślnej bazy JBossa - HSQLDB ? Wydaje mi się że nie bo to prosty przykład i chba ta baza powinna starczyć do jego wykonania.
Załączam kod klienta wysyłajacego:

 import java.rmi.RemoteException;
import java.util.Enumeration;
import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ConnectionMetaData;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XAQueueConnection;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;


import org.omg.CosTransactions.Status;

 //Klient wysyła komunikaty do kolejki w transakcji i czeka z ich potwierdzniem  (commit()) 
// W tym czasie drugi klient (Message_Cli_5_B) stara sie je pobrać (te komunikaty) z tej kolejki


public class Main {

	/* (non-Java-doc)
	 * @see java.lang.Object#Object()
	 */
	public Main() {
		super();
	}

	
    public static void main(String[] args) {
    
		
		
	    Properties props=new Properties();
	    props.put(Context.INITIAL_CONTEXT_FACTORY, "org.jboss.naming.NamingContextFactory"); 
	    props.put(Context.PROVIDER_URL, "jnp://localhost:1099");
	    props.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
		
   try {
    	InitialContext ctx = new InitialContext(props);

    	   UserTransaction trans = (UserTransaction)  ctx.lookup("UserTransaction");	
    	   System.out.println("Klient UsrTransaction działa:   " + trans.toString());
    	
    	   try {
			trans.begin();        // ---------------początek transakcji
		} catch (NotSupportedException e1) {
			e1.printStackTrace();
		} catch (SystemException e1) {
			e1.printStackTrace();
		}
    	   
    	   

    	   ConnectionFactory factory = (ConnectionFactory) ctx.lookup("ConnectionFactory");
    	   

    	  
    	   Queue queue = (Queue) ctx.lookup("queue/KyQueue");	 // inna kolejka tutaj
    	   
    	try {
 
    		Connection connect = factory.createConnection();
    		
			Session session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE);
		    
    	    MessageProducer producer = session.createProducer(queue);
    	    

    	    
    	    try {
             	Thread.sleep(3000);
             } catch (InterruptedException e) {
	          e.printStackTrace();
             }
    	    
    	//    connect.start();
    	    
    	    System.out.println("Klient  - początek wysyłania messagów");	
    	    
    	    // tworzenie komunikatów - 3 komunikaty
    	     for (int n = 0; n < 3; n++ ) {
    	        MapMessage  message_1 = session.createMapMessage();
    	        if (n < 3) {
    	          message_1.setString("has next?", "yes");
    	        } else {
    	        	 message_1.setString("has next?", "no");
    	        }
    	           message_1.setInt("dana_1", (int) (10*(n + 1)*Math.pow(10, n)));
    	           producer.send(message_1);
    	           System.out.println("Klient, wysłano message nr: " + (n + 1));	
    	                  
    	         try {
		             	Thread.sleep(2000);
		             } catch (InterruptedException e) {
			          e.printStackTrace();
		             }
     	
    	     }
    	     
    	  
    	     
    	     try {
	             	Thread.sleep(5000);
	             } catch (InterruptedException e) {
		          e.printStackTrace();
	             }
    	     
    	     
    	
    	     
    	     System.out.println("Klient, wykonywanie commit lub rollback");
    	     producer.close();
    	     session.close();
    	     connect.close();
 	    
    	    
    	 try {
			trans.rollback();  // wycofanie trasakcji
		} catch (Exception e) {

			e.printStackTrace();
		}
    	 
    	 
    	 
    	 
    	     
    	} catch (JMSException e) {
		
			e.printStackTrace();
		}
    	
    	} catch (NamingException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	
	
	}
	
	
	
} 
0

To wywolanie:
connect.createSession(false, Session.AUTO_ACKNOWLEDGE);
korzysta z funkcji z taka deklaracja w API:

Session createSession(boolean transacted, acknowledgeMode) JMSException

Czy widzisz juz swoj blad?

0

To chyba nie jest ten błąd. Opcja "true" w tym miejscu otwiera tranakcję lokalną która musi być komitowana przez session.commit() a nie trans.commit() (UserTransaction). Ja chcę korzystać z transakcji rozproszonej przez UserTransaction. W przykładzie z którego korzystam w tym miejscu jest "false" przy otwieraniu transakcji UserTrans.

1 użytkowników online, w tym zalogowanych: 0, gości: 1