How to ensure transactional atomicity in more than one concurrent Thread in a Java SE environment using the Spring framework?

Asked

Viewed 1,111 times

8

A Java SE solution, using the Spring framework, is being developed in order to parallelize the execution of a Stored Procedure in Oracle 9i, Procedure which takes as parameter one or more lines of a giant file to be processed.

This will be done through a Threads pool, where each Thread will invoke the same Stored Procedure by passing as parameter different lines of this file in order to optimize the processing time compared to only one execution of this Procedure. However, it is necessary to guarantee transactional atomicity over all executions/transactions of this same Procedure, as is guaranteed today in a single execution of this Procedure - that is, commit the transactions at the end and only if no other error is found: and if there is an error in any execution of a Thread, it will be necessary to rollback all other transactions.

It was logically considered to use a JTA (XA) implementation in Spring, as here, where each Thread/transaction would be considered a two-Phase commit feature, but I believe that the design of this solution breaks the principle of the JTA mechanism, because at first it only guarantees atomicity over the resources used in the same transactional method, that is, in only one Thread.

How to ensure this in a non-programmatic way?

1 answer

5

Using only Atomikos, an implementation of JTA and of XA, made a simple example that lets you run processing on multiple threads within a transaction.

The complete project is available in my Github.

Implementation

First of all, we have the initialization of DataSource and of TransactionManager:

// Atomikos implementations
private static UserTransactionManager utm;
private static AtomikosDataSourceBean adsb;

// initialize resources
public static void init() {
    utm = new UserTransactionManager();
    try {
        utm.init();
        adsb = new AtomikosDataSourceBean();
        adsb.setMaxPoolSize(20);
        adsb.setUniqueResourceName("postgres");
        adsb.setXaDataSourceClassName("org.postgresql.xa.PGXADataSource");
        Properties p = new Properties();
        p.setProperty("user", "postgres");
        p.setProperty("password", "0");
        p.setProperty("serverName", "localhost");
        p.setProperty("portNumber", "5432");
        p.setProperty("databaseName", "postgres");
        adsb.setXaProperties(p);
    } catch (SystemException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    }
}

After, a thread that receives the transaction instance (Transaction) leading:

private static class Processamento implements Callable<Integer> {

    private int id;
    private boolean falhar;
    private Transaction transaction;

    public Processamento(int id, boolean falhar, Transaction transaction) {
        this.falhar = falhar;
        this.transaction = transaction;
        this.id = id;
    }

    public Integer call() throws Exception {
        if (falhar) {
            throw new RuntimeException("Falhou inesperadamente!");
        }

        //enlist xa connection
        XAConnection xac = AtomikosDataSource.getDS().getXaDataSource().getXAConnection();
        synchronized (transaction) {
            transaction.enlistResource(xac.getXAResource());
        }

        //normal execution, update row with OK
        Connection c = xac.getConnection();
        Statement s = c.createStatement();
        s.executeUpdate("update teste set processado = 'ok' where id = " + id);
        s.close();
        c.close();

        //delist xa connection
        synchronized (transaction) {
            transaction.delistResource(xac.getXAResource(), XAResource.TMSUCCESS);
        }
        return id;
    }

}

Note that instead of using JTA, I am directly using the XA API implemented by Atomikos.

The call AtomikosDataSource.getDS().getXaDataSource().getXAConnection() recovers a connection from XA, which is added to the main transaction with the command transaction.enlistResource(xac.getXAResource()).

I did the synchronization in some parts, because I randomly obtained some NullPointerException in tests, but this should not be a problem if you use the connections with prudence, that is, without opening and closing them all the time.

Finally, a method that starts five instances of thread above:

public static int processar(boolean falhar) {
    int ok = 0;
    Transaction transaction = null;
    try {

        //start transaction
        AtomikosDataSource.getTM().begin();
        transaction = AtomikosDataSource.getTM().getTransaction();

        //create thread pool
        ExecutorService executor = Executors.newFixedThreadPool(5);
        List<Callable<Integer>> processos = new ArrayList<Callable<Integer>>();

        //create 5 threads, passing the main transaction as argument
        for (int i = 0; i < 5; i++) {
            processos.add(new Processamento(i + 1, i == 4 && falhar, transaction));
        }

        //execute threads and wait
        List<Future<Integer>> futures = executor.invokeAll(processos);

        //count the result; get() will fail if thread threw an exception
        Throwable ex = null;
        for (Future<Integer> future : futures) {
            try {
                int threadId = future.get();
                System.out.println("Thread " + threadId + " sucesso!");
                ok++; 
            } catch (Throwable e) {
                ex = e;
            }
        }

        if (ex != null) {
            throw ex;
        }

        //finish transaction normally
        transaction.commit();

    } catch (Throwable e) {

        e.printStackTrace();
        try {
            //try to rollback
            if (transaction != null) {
                AtomikosDataSource.getTM().rollback();
            }
        } catch (IllegalStateException e1) {
            e1.printStackTrace();
        } catch (SecurityException e1) {
            e1.printStackTrace();
        } catch (SystemException e1) {
            e1.printStackTrace();
        }

    }
    return ok;
}

I did some tests both in a scenario of success and failure to validate the result.

In the success scenario, each of the five threads updates a table row TESTE with the value ok and does the commit of the transaction.

In the failure scenario, the last thread always throws an exception, forcing the rollback of the other four threads.

See the code on Github for more details.

Notes on the configuration

I used Postgresql in the example. It was necessary to enable the configuration max_prepared_transactions with a value greater than zero in the configuration file postgresql.conf.

It is important to check whether your database driver supports distributed transactions. I read somewhere that Mysql may have some problems with this.

Remarks

To make the example work with Spring, simply configure the classes created manually in Beans in XML or through annotations. At your discretion.

Be careful to implement something like this inside an Application Server, so as not to interfere with normal system transactions.

Personally, I don’t see a real need for parallel processing within the same transaction. It is much more efficient to divide the processing into transactional blocks. There are several techniques to do this without making the system state inconsistent, for example using additional columns in the table or even an extra table.

  • Luiz, you may not see any reason for this, but the need, in my case and typically in many others, exists and is a premise to ensure the consistency of the data, without delegating this responsibility to other programmatic mechanisms of block reprocessing, often defined as compensating transactions. The XA protocol, contrary to what you stated, ensures that a global transaction orchestrates other transactions and ensures the consistent state of a database.

  • 1

    @Albertofragaláneto I did not say that XA does not do this, only that I do not see much sense in this context. I do not know until today of a comic book with such a volume of data where it was necessary to maintain a transaction and at the same time process in parallel. On the issue of consistency, I see no impediment to securing it per block. From the experiences I have in working with volume, I maintain my position: in general this requirement of consistency "all or nothing" is unnecessary if the processing is well planned. Something prevents your system from having a "processed" flag for each block?

  • @Albertofragaláneto Anyway, I will update the answer with an example project I did.

  • Luiz, after several tests, was noticed that the "enlistment" of the connections of each Thread is not occurring. The connections are being committed independently, at the moment when the "executeUpdate" method is invoked. This can be observed if you invoke only the first "success()" test, and only this one, and perform a rollback in Transactionmanager instead of commit at the end. You will notice all lines updated to "ok". I secured the parameter you quoted, observing in the log (INFO: USING com.atomikos.icatch.threaded_2pc = true). What must be occurring?

  • Luiz, I’ve been reading the documentation of this parameter that you gave me, and I noticed that in fact it does not realize what you proposed. The documentation states that the multi-thread process is actually about the two-Phase-commit commit process, for performance purposes when there are many resources involved in the transaction: (http://www.atomikos.com/Documentation/JtaProperties)

  • @Albertofragaláneto Good news. I managed to touch the project now at night. I was really wrong about the parameter mentioned in the previous answer. My "success" test must have been a coincidence. I did the enlistment manually by reference to the Transaction from the main thread and got the results I think you’re looking for. Good luck on your actual implementation!

  • Luiz, from now on, thank you very much for the time spent! Just to conclude: then we have the conclusion that there is no implementation of the JTA that guarantees this in a non-programmatical way?

  • @Albertofragaláneto It may be that some implementation has a parameter like the one that at first I thought was doing this, but I don’t know a non programmatic one. As far as I understand it, the JTA architecture itself defines threading, since the pattern of a thread request is that applied to all JEE servers.

Show 3 more comments

Browser other questions tagged

You are not signed in. Login or sign up in order to post.