atomikos       

atomikos

TransactionEssentials:开源的免费产品

ExtremeTransactions:上商业版,需要收费。

image-20211208122451428

TransactionEssentials:

1、实现了JTA/XA规范中的事务管理器(Transaction Manager)应该实现的相关接口,如:

UserTransaction实现是com.atomikos.icatch.jta.UserTransactionImp,用户只需要直接操作这个类

TransactionManager实现是com.atomikos.icatch.jta.UserTransactionManager

Transaction实现是com.atomikos.icatch.jta.TransactionImp

2、针对实现了JDBC规范中规定的实现了XADataSource接口的数据库连接池,以及实现了JMS规范的MQ客户端提供一层封装。

在上一节我们讲解JTA规范时,提到过XADataSource、XAConnection等接口应该由资源管理器RM来实现,而Atomikos的作用是一个事务管理器(TM),并不需要提供对应的实现。而Atomikos对XADataSource进行封装,只是为了方便与事务管理器整合。封装XADataSource的实现类为AtomikosDataSourceBean。典型的XADataSource实现包括:

1、mysql官方提供的com.mysql.jdbc.jdbc2.optional.MysqlXADataSource

2、阿里巴巴开源的druid连接池,对应的实现类为com.alibaba.druid.pool.xa.DruidXADataSource

3、tomcat-jdbc连接池提供的org.apache.tomcat.jdbc.pool.XADataSource

而其他一些常用的数据库连接池,如dbcp、dbcp2或者c3p0,目前貌似尚未提供XADataSource接口的实现。如果提供给AtomikosDataSourceBean一个没有实现XADataSource接口的数据源,如c3p0的ComboPooledDataSource,则会抛出类似以下异常:

com.atomikos.jdbc.AtomikosSQLException: The class 'com.mchange.v2.c3p0.ComboPooledDataSource' specified by property 'xaDataSourceClassName' does not implement the required interface   javax.jdbc.XADataSource.   Please make sure the spelling is correct, and check your JDBC driver vendor's documentation.
public class MysqlXADemo {
    
    public static void main(String[] args) throws SQLException {
        //true表示打印XA语句,,用于调试
        boolean logXaCommands = true;
        // 获得资源管理器操作接口实例 RM1
        Connection conn1 = DriverManager.getConnection
                ("jdbc:mysql://localhost:3306/db_user", "root", "root");
        XAConnection xaConn1 = new MysqlXAConnection(
                (com.mysql.jdbc.Connection) conn1, logXaCommands);
        XAResource rm1 = xaConn1.getXAResource();
        
        // 获得资源管理器操作接口实例 RM2
        Connection conn2 = DriverManager.getConnection
                ("jdbc:mysql://localhost:3306/db_account", "root", "root");
        XAConnection xaConn2 = new MysqlXAConnection(
                (com.mysql.jdbc.Connection) conn2, logXaCommands);
        XAResource rm2 = xaConn2.getXAResource();
        
        // AP请求TM执行一个分布式事务,TM生成全局事务id
        byte[] gtrid = "g12345".getBytes();
        int formatId = 1;
        try {
            
            // ==============分别执行RM1和RM2上的事务分支====================
            // TM生成rm1上的事务分支id
            byte[] bqual1 = "b00001".getBytes();
            Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
            // 执行rm1上的事务分支
            rm1.start(xid1, XAResource.TMNOFLAGS);//One of TMNOFLAGS, TMJOIN, or TMRESUME.
            PreparedStatement ps1 = conn1.prepareStatement(
                    "INSERT into user(name) VALUES ('Fox')");
            ps1.execute();
            rm1.end(xid1, XAResource.TMSUCCESS);
            
            // TM生成rm2上的事务分支id
            byte[] bqual2 = "b00002".getBytes();
            Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
            // 执行rm2上的事务分支
            rm2.start(xid2, XAResource.TMNOFLAGS);
            PreparedStatement ps2 = conn2.prepareStatement(
                    "INSERT into account(user_id,money) VALUES (1,10000000)");
            ps2.execute();
            rm2.end(xid2, XAResource.TMSUCCESS);
            
            // ===================两阶段提交================================
            // phase1:询问所有的RM 准备提交事务分支
            int rm1_prepare = rm1.prepare(xid1);
            int rm2_prepare = rm2.prepare(xid2);
            // phase2:提交所有事务分支
            boolean onePhase = false;
            //TM判断有2个事务分支,所以不能优化为一阶段提交
            if (rm1_prepare == XAResource.XA_OK
                    && rm2_prepare == XAResource.XA_OK) {
                //所有事务分支都prepare成功,提交所有事务分支
                rm1.commit(xid1, onePhase);
                rm2.commit(xid2, onePhase);
            } else {
                //如果有事务分支没有成功,则回滚
                rm1.rollback(xid1);
                rm2.rollback(xid2);
            }
        } catch (XAException e) {
            // 如果出现异常,也要进行回滚
            e.printStackTrace();
        }
    }
}
public class AtomikosDemo {
 
   private static AtomikosDataSourceBean createAtomikosDataSourceBean(String dbName) {
      // 连接池基本属性
      Properties p = new Properties();
      p.setProperty("url", "jdbc:mysql://localhost:3306/" + dbName);
      p.setProperty("user", "root");
      p.setProperty("password", "root");
 
      // 使用AtomikosDataSourceBean封装com.mysql.jdbc.jdbc2.optional.MysqlXADataSource
      AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
      //设置resourceName 唯一
      ds.setUniqueResourceName(dbName);
      ds.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
      ds.setXaProperties(p);
      return ds;
   }
 
   public static void main(String[] args) {
 
      AtomikosDataSourceBean ds1 = createAtomikosDataSourceBean("db_user");
      AtomikosDataSourceBean ds2 = createAtomikosDataSourceBean("db_account");
 
      Connection conn1 = null;
      Connection conn2 = null;
      PreparedStatement ps1 = null;
      PreparedStatement ps2 = null;
 
      UserTransaction userTransaction = new UserTransactionImp();
      try {
         // 开启事务
         userTransaction.begin();
 
         // 执行db1上的sql
         conn1 = ds1.getConnection();
         ps1 = conn1.prepareStatement("INSERT into user(name) VALUES (?)", Statement.RETURN_GENERATED_KEYS);
         ps1.setString(1, "Fox");
         ps1.executeUpdate();
         ResultSet generatedKeys = ps1.getGeneratedKeys();
         int userId = -1;
         while (generatedKeys.next()) {
            // 获得自动生成的userId
            userId = generatedKeys.getInt(1);
         }
 
         // 模拟异常 ,直接进入catch代码块,2个都不会提交
        //int i=1/0;
 
         // 执行db2上的sql
         conn2 = ds2.getConnection();
         ps2 = conn2.prepareStatement("INSERT into account(user_id,money) VALUES (?,?)");
         ps2.setInt(1, userId);
         ps2.setDouble(2, 10000000);
         ps2.executeUpdate();
 
         // 两阶段提交
         userTransaction.commit();
      } catch (Exception e) {
         try {
            e.printStackTrace();
            userTransaction.rollback();
         } catch (SystemException e1) {
            e1.printStackTrace();
         }
      } finally {
         try {
            ps1.close();
            ps2.close();
            conn1.close();
            conn2.close();
            ds1.close();
            ds2.close();
         } catch (Exception ignore) {
         }
      }
   }
}

参考

http://www.tianshouzhi.com/api/tutorials/distributed_transaction/386