Using only Atomikos, an implementation of JTA and of XA, made a simple example that lets you run processing on multiple threads within a transaction.
The complete project is available in my Github.
Implementation
First of all, we have the initialization of DataSource
and of TransactionManager
:
// Atomikos implementations
private static UserTransactionManager utm;
private static AtomikosDataSourceBean adsb;
// initialize resources
public static void init() {
utm = new UserTransactionManager();
try {
utm.init();
adsb = new AtomikosDataSourceBean();
adsb.setMaxPoolSize(20);
adsb.setUniqueResourceName("postgres");
adsb.setXaDataSourceClassName("org.postgresql.xa.PGXADataSource");
Properties p = new Properties();
p.setProperty("user", "postgres");
p.setProperty("password", "0");
p.setProperty("serverName", "localhost");
p.setProperty("portNumber", "5432");
p.setProperty("databaseName", "postgres");
adsb.setXaProperties(p);
} catch (SystemException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
After, a thread that receives the transaction instance (Transaction
) leading:
private static class Processamento implements Callable<Integer> {
private int id;
private boolean falhar;
private Transaction transaction;
public Processamento(int id, boolean falhar, Transaction transaction) {
this.falhar = falhar;
this.transaction = transaction;
this.id = id;
}
public Integer call() throws Exception {
if (falhar) {
throw new RuntimeException("Falhou inesperadamente!");
}
//enlist xa connection
XAConnection xac = AtomikosDataSource.getDS().getXaDataSource().getXAConnection();
synchronized (transaction) {
transaction.enlistResource(xac.getXAResource());
}
//normal execution, update row with OK
Connection c = xac.getConnection();
Statement s = c.createStatement();
s.executeUpdate("update teste set processado = 'ok' where id = " + id);
s.close();
c.close();
//delist xa connection
synchronized (transaction) {
transaction.delistResource(xac.getXAResource(), XAResource.TMSUCCESS);
}
return id;
}
}
Note that instead of using JTA, I am directly using the XA API implemented by Atomikos.
The call AtomikosDataSource.getDS().getXaDataSource().getXAConnection()
recovers a connection from XA, which is added to the main transaction with the command transaction.enlistResource(xac.getXAResource())
.
I did the synchronization in some parts, because I randomly obtained some NullPointerException
in tests, but this should not be a problem if you use the connections with prudence, that is, without opening and closing them all the time.
Finally, a method that starts five instances of thread above:
public static int processar(boolean falhar) {
int ok = 0;
Transaction transaction = null;
try {
//start transaction
AtomikosDataSource.getTM().begin();
transaction = AtomikosDataSource.getTM().getTransaction();
//create thread pool
ExecutorService executor = Executors.newFixedThreadPool(5);
List<Callable<Integer>> processos = new ArrayList<Callable<Integer>>();
//create 5 threads, passing the main transaction as argument
for (int i = 0; i < 5; i++) {
processos.add(new Processamento(i + 1, i == 4 && falhar, transaction));
}
//execute threads and wait
List<Future<Integer>> futures = executor.invokeAll(processos);
//count the result; get() will fail if thread threw an exception
Throwable ex = null;
for (Future<Integer> future : futures) {
try {
int threadId = future.get();
System.out.println("Thread " + threadId + " sucesso!");
ok++;
} catch (Throwable e) {
ex = e;
}
}
if (ex != null) {
throw ex;
}
//finish transaction normally
transaction.commit();
} catch (Throwable e) {
e.printStackTrace();
try {
//try to rollback
if (transaction != null) {
AtomikosDataSource.getTM().rollback();
}
} catch (IllegalStateException e1) {
e1.printStackTrace();
} catch (SecurityException e1) {
e1.printStackTrace();
} catch (SystemException e1) {
e1.printStackTrace();
}
}
return ok;
}
I did some tests both in a scenario of success and failure to validate the result.
In the success scenario, each of the five threads updates a table row TESTE
with the value ok
and does the commit of the transaction.
In the failure scenario, the last thread always throws an exception, forcing the rollback of the other four threads.
See the code on Github for more details.
Notes on the configuration
I used Postgresql in the example. It was necessary to enable the configuration max_prepared_transactions
with a value greater than zero in the configuration file postgresql.conf
.
It is important to check whether your database driver supports distributed transactions. I read somewhere that Mysql may have some problems with this.
Remarks
To make the example work with Spring, simply configure the classes created manually in Beans in XML or through annotations. At your discretion.
Be careful to implement something like this inside an Application Server, so as not to interfere with normal system transactions.
Personally, I don’t see a real need for parallel processing within the same transaction. It is much more efficient to divide the processing into transactional blocks. There are several techniques to do this without making the system state inconsistent, for example using additional columns in the table or even an extra table.
Luiz, you may not see any reason for this, but the need, in my case and typically in many others, exists and is a premise to ensure the consistency of the data, without delegating this responsibility to other programmatic mechanisms of block reprocessing, often defined as compensating transactions. The XA protocol, contrary to what you stated, ensures that a global transaction orchestrates other transactions and ensures the consistent state of a database.
– Alberto Fragalá Neto
@Albertofragaláneto I did not say that XA does not do this, only that I do not see much sense in this context. I do not know until today of a comic book with such a volume of data where it was necessary to maintain a transaction and at the same time process in parallel. On the issue of consistency, I see no impediment to securing it per block. From the experiences I have in working with volume, I maintain my position: in general this requirement of consistency "all or nothing" is unnecessary if the processing is well planned. Something prevents your system from having a "processed" flag for each block?
– utluiz
@Albertofragaláneto Anyway, I will update the answer with an example project I did.
– utluiz
Luiz, after several tests, was noticed that the "enlistment" of the connections of each Thread is not occurring. The connections are being committed independently, at the moment when the "executeUpdate" method is invoked. This can be observed if you invoke only the first "success()" test, and only this one, and perform a rollback in Transactionmanager instead of commit at the end. You will notice all lines updated to "ok". I secured the parameter you quoted, observing in the log (INFO: USING com.atomikos.icatch.threaded_2pc = true). What must be occurring?
– Alberto Fragalá Neto
Luiz, I’ve been reading the documentation of this parameter that you gave me, and I noticed that in fact it does not realize what you proposed. The documentation states that the multi-thread process is actually about the two-Phase-commit commit process, for performance purposes when there are many resources involved in the transaction: (http://www.atomikos.com/Documentation/JtaProperties)
– Alberto Fragalá Neto
@Albertofragaláneto Good news. I managed to touch the project now at night. I was really wrong about the parameter mentioned in the previous answer. My "success" test must have been a coincidence. I did the enlistment manually by reference to the
Transaction
from the main thread and got the results I think you’re looking for. Good luck on your actual implementation!– utluiz
Luiz, from now on, thank you very much for the time spent! Just to conclude: then we have the conclusion that there is no implementation of the JTA that guarantees this in a non-programmatical way?
– Alberto Fragalá Neto
@Albertofragaláneto It may be that some implementation has a parameter like the one that at first I thought was doing this, but I don’t know a non programmatic one. As far as I understand it, the JTA architecture itself defines threading, since the pattern of a thread request is that applied to all JEE servers.
– utluiz