DbSyncServiceImpl.java 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. package jnpf.base.service.impl;
  2. import cn.hutool.core.collection.CollectionUtil;
  3. import com.alibaba.druid.proxy.jdbc.NClobProxyImpl;
  4. import jnpf.base.service.DbLinkService;
  5. import jnpf.base.service.DbSyncService;
  6. import jnpf.base.service.DbTableService;
  7. import jnpf.constant.TableFieldsNameConst;
  8. import jnpf.database.datatype.model.DtModelDTO;
  9. import jnpf.database.datatype.sync.util.DtSyncUtil;
  10. import jnpf.database.model.dbfield.DbFieldModel;
  11. import jnpf.database.model.dbfield.JdbcColumnModel;
  12. import jnpf.database.model.dbtable.DbTableFieldModel;
  13. import jnpf.database.model.dbtable.JdbcTableModel;
  14. import jnpf.database.model.dto.PrepSqlDTO;
  15. import jnpf.database.model.entity.DbLinkEntity;
  16. import jnpf.database.source.DbBase;
  17. import jnpf.database.sql.enums.base.SqlComEnum;
  18. import jnpf.database.sql.model.SqlPrintHandler;
  19. import jnpf.database.sql.param.FormatSqlDM;
  20. import jnpf.database.sql.param.FormatSqlKingbaseES;
  21. import jnpf.database.sql.param.FormatSqlMySQL;
  22. import jnpf.database.sql.param.FormatSqlOracle;
  23. import jnpf.database.sql.util.SqlFastUtil;
  24. import jnpf.database.util.DataSourceUtil;
  25. import jnpf.database.util.DbTypeUtil;
  26. import jnpf.database.util.JdbcUtil;
  27. import jnpf.exception.DataException;
  28. import jnpf.exception.DataTypeException;
  29. import lombok.extern.slf4j.Slf4j;
  30. import org.springframework.beans.factory.annotation.Autowired;
  31. import org.springframework.stereotype.Service;
  32. import org.springframework.transaction.annotation.Transactional;
  33. import java.util.*;
  34. import java.util.function.Function;
  35. import java.util.stream.Collectors;
  36. /**
  37. * 数据同步
  38. *
  39. * @author JNPF开发平台组
  40. * @version V3.1.0
  41. * @copyright 引迈信息技术有限公司
  42. * @date 2019年9月27日 上午9:18
  43. */
  44. @Slf4j
  45. @Service
  46. public class DbSyncServiceImpl implements DbSyncService {
  47. @Autowired
  48. private DbLinkService dblinkService;
  49. @Autowired
  50. private DbTableService dbTableService;
  51. @Autowired
  52. private SqlPrintHandler sqlPrintHandler;
  53. @Autowired
  54. private DataSourceUtil dataSourceUtil;
  55. private static Properties props;
  56. static {
  57. Properties props = new Properties();
  58. props.setProperty("remarks", "true"); //设置可以获取remarks信息
  59. props.setProperty("useInformationSchema", "true");//设置可以获取tables remarks信息
  60. DbSyncServiceImpl.props = props;
  61. }
  62. @Override
  63. public Integer executeCheck(String fromId, String toId, Map<String, String> convertRuleMap, String table) throws Exception {
  64. DbLinkEntity dbLinkFrom;
  65. DbLinkEntity dbLinkTo;
  66. if("0".equals(fromId)){
  67. dbLinkFrom = dataSourceUtil.init();
  68. }else {
  69. dbLinkFrom = DbLinkEntity.newInstance(fromId);
  70. }
  71. if("0".equals(toId)){
  72. dbLinkTo = dataSourceUtil.init();
  73. }else {
  74. dbLinkTo = DbLinkEntity.newInstance(toId);
  75. }
  76. //验证一(同库无法同步数据)
  77. if (fromId.equals(toId) ||
  78. (Objects.equals(dbLinkFrom.getHost(), dbLinkTo.getHost()) &&
  79. (Objects.equals(dbLinkFrom.getPort(), dbLinkTo.getPort()) &&
  80. (Objects.equals(dbLinkFrom.getDbName(), dbLinkTo.getDbName())
  81. )))){
  82. if(DbBase.ORACLE.equals(dbLinkFrom.getDbType()) || DbBase.DM.equals(dbLinkFrom.getDbType())){
  83. if(dbLinkFrom.getUserName().equals(dbLinkTo.getUserName())){
  84. return -1;
  85. }
  86. }else {
  87. return -1;
  88. }
  89. }
  90. //验证二(表存在)
  91. if (dbTableService.isExistTable(toId, table)) {
  92. //验证三(验证表数据)
  93. if (SqlFastUtil.tableDataExist(toId, table)) {
  94. //被同步表存在数据
  95. return 3;
  96. }
  97. }
  98. // 表不存在
  99. if (!dbTableService.isExistTable(toId, table)) {
  100. return 2;
  101. }
  102. return 0;
  103. }
  104. @Override
  105. public void execute(String dbLinkIdFrom, String dbLinkIdTo, Map<String, String> convertRuleMap, String table) throws Exception {
  106. executeTableCommon(dbLinkIdFrom, dbLinkIdTo, convertRuleMap, table);
  107. }
  108. @Override
  109. public Map<String, Integer> executeBatch(String dbLinkIdFrom, String dbLinkIdTo, Map<String, String> convertRuleMap, List<String> tableList) {
  110. Map<String, Integer> messageMap = new HashMap<>(16);
  111. for (int i = 0; i < tableList.size(); i++) {
  112. String table = tableList.get(i);
  113. int total = tableList.size();
  114. try{
  115. executeTableCommon(dbLinkIdFrom, dbLinkIdTo, convertRuleMap, table);
  116. messageMap.put(table, 1);
  117. log.info("表:(" + table + ")同步成功!" + "(" + (i + 1) + "/" + total + ")");
  118. }catch (Exception e){
  119. e.printStackTrace();
  120. messageMap.put(table, 0);
  121. log.info("表:(" + table + ")同步失败!" + "(" + (i + 1) + "/" + total + ")");
  122. }
  123. }
  124. return messageMap;
  125. }
  126. /**
  127. * 【主要】同步建表操作
  128. */
  129. public void executeTableCommon(String fromLinkId, String toLinkId, Map<String, String> convertRuleMap, String table) throws Exception {
  130. sqlPrintHandler.tableInfo(table);
  131. DbLinkEntity dbLinkFrom = dblinkService.getResource(fromLinkId);
  132. DbLinkEntity dbLinkTo = dblinkService.getResource(toLinkId);
  133. // 1、删除To表
  134. try{
  135. // 2、创建To表
  136. DbTableFieldModel tableMod = convertFileDataType(dbTableService.getDbTableModel(fromLinkId, table), convertRuleMap, dbLinkFrom.getDbType(), dbLinkTo.getDbType());
  137. if(!sqlPrintHandler.getPrintFlag()) SqlFastUtil.dropTable(dbLinkTo, table);
  138. SqlFastUtil.creTable(dbLinkTo, tableMod);
  139. // 3、同步数据 From -> To
  140. SqlFastUtil.batchInsert(table, dbLinkTo, getInsertMapList(dbLinkFrom, dbLinkTo.getDbType(), table));
  141. }catch (Exception ignore){
  142. ignore.printStackTrace();
  143. }
  144. }
  145. /**
  146. * 打印初始脚本
  147. *
  148. * @param dbLinkIdFrom 数据连接ID
  149. * @param printType dbInit:初始脚本、dbStruct:表结构、dbData:数据、tenant:多租户
  150. */
  151. public Map<String, Integer> printDbInit(String dbLinkIdFrom, String dbTypeTo, List<String> tableList, Map<String, String> convertRuleMap, String printType) throws Exception {
  152. DbLinkEntity dbLinkEntity = DbLinkEntity.newInstance(dbLinkIdFrom);
  153. if(CollectionUtil.isEmpty(tableList)){
  154. tableList = SqlFastUtil.getTableList(dbLinkEntity).stream().map(DbTableFieldModel::getTable).collect(Collectors.toList());
  155. }
  156. List<String> tableNameList = new ArrayList<>();
  157. Map<String, Integer> messageMap = new HashMap<>(16);
  158. for (int i = 0; i < tableList.size(); i++) {
  159. String table = tableList.get(i);
  160. sqlPrintHandler.tableInfo(table);
  161. tableNameList.add(table);
  162. DbTableFieldModel dbTableFieldModel;
  163. if(true){
  164. // 方式一:通过JDBC查询表字段信息
  165. dbTableFieldModel = convertFileDataType(new JdbcTableModel(dbLinkEntity, table).convertDbTableFieldModel(), convertRuleMap, dbLinkEntity.getDbType(), dbTypeTo);
  166. }else {
  167. // 方式二:通过SQL语句获取的表字段信息
  168. dbTableFieldModel = convertFileDataType(dbTableService.getDbTableModel(dbLinkIdFrom, table), convertRuleMap, dbLinkEntity.getDbType(), dbTypeTo);
  169. }
  170. List<Map<String, Object>> tableData = getInsertMapList(dbLinkEntity, dbTypeTo, table);
  171. DbLinkEntity dbLink = new DbLinkEntity(dbTypeTo);
  172. try{
  173. switch (printType){
  174. case "dbInit":
  175. case "dbNull":
  176. // SqlFastUtil.dropTable(dbLink, table);
  177. SqlFastUtil.creTable(dbLink, dbTableFieldModel);
  178. SqlFastUtil.batchInsert(table, dbLink, tableData);
  179. break;
  180. case "tenantCre":
  181. if(DbBase.POSTGRE_SQL.equals(dbTypeTo) || DbBase.KINGBASE_ES.equals(dbTypeTo)){
  182. // dbTableFieldModel.setTable("${dbName}." + dbTableFieldModel.getTable());
  183. } else if (DbBase.ORACLE.equals(dbTypeTo) ){
  184. // dbTableFieldModel.setTable("{schema}." + dbTableFieldModel.getTable());
  185. }
  186. case "dbStruct":
  187. // SqlFastUtil.dropTable(dbLink, table);
  188. SqlFastUtil.creTable(dbLink, dbTableFieldModel);
  189. break;
  190. case "dbData":
  191. SqlFastUtil.batchInsert(table, dbLink, tableData);
  192. break;
  193. }
  194. messageMap.put(table, 1);
  195. log.info("表:(" + table + ")同步成功!" + "(" + (i + 1) + "/" + tableList.size() + ")");
  196. }catch (Exception e){
  197. e.printStackTrace();
  198. messageMap.put(table, 0);
  199. log.info("表:(" + table + ")同步失败!" + "(" + (i + 1) + "/" + tableList.size() + ")");
  200. }
  201. }
  202. if(printType.equals("tenantCreNoTab") || printType.equals("tenantCre")){
  203. sqlPrintHandler.append("\n\n").append(creTenant(tableNameList, dbTypeTo));
  204. }
  205. return messageMap;
  206. }
  207. /**
  208. * 多租户创库
  209. */
  210. public static String creTenant(List<String> tableNameList, String dbEncode){
  211. List<String> ignoreTables = Arrays.asList("undo_log");
  212. StringBuilder insertTenant = new StringBuilder();
  213. for (String table : tableNameList) {
  214. if(ignoreTables.contains(table.toLowerCase())){
  215. continue;
  216. }
  217. String intoTable = table;
  218. String fromTable = "${dbName}." + table;
  219. switch (dbEncode){
  220. case DbBase.SQL_SERVER:
  221. fromTable = "${dbName}.dbo." + table;
  222. break;
  223. // case DbBase.POSTGRE_SQL:
  224. // intoTable = "${dbName}." + table;
  225. // fromTable = "\"public\"." + table;
  226. // break;
  227. case DbBase.ORACLE:
  228. // intoTable = "{schema}." + table;
  229. fromTable = "{initSchema}." + table;
  230. break;
  231. case DbBase.DM:
  232. case DbBase.KINGBASE_ES:
  233. case DbBase.MYSQL:
  234. }
  235. insertTenant.append("INSERT INTO ").append(intoTable).append(" SELECT * FROM ").append(fromTable)
  236. .append(" where ").append(TableFieldsNameConst.F_TENANT_ID).append(" = '0'").append(";").append("\n");
  237. }
  238. return insertTenant.toString();
  239. }
  240. /**
  241. * 获取插入数据map
  242. */
  243. public List<Map<String, Object>> getInsertMapList(DbLinkEntity dbLinkFrom, String toDbType, String table) throws Exception {
  244. List<List<JdbcColumnModel>> modelList = JdbcUtil.queryJdbcColumns(new PrepSqlDTO(SqlComEnum.SELECT_TABLE.getOutSql(table)).withConn(dbLinkFrom)).get();
  245. List<Map<String, Object>> insertMapList = new ArrayList<>();
  246. for (List<JdbcColumnModel> jdbcColumnModels : modelList) {
  247. Map<String, Object> map = new HashMap<>();
  248. for (JdbcColumnModel jdbcColumnModel : jdbcColumnModels) {
  249. map.put(jdbcColumnModel.getField(), checkValue(jdbcColumnModel, dbLinkFrom.getDbType()));
  250. FormatSqlOracle.nullValue(toDbType, jdbcColumnModel, map); // Oracle空串处理
  251. FormatSqlKingbaseES.nullValue(toDbType, jdbcColumnModel, map); // KingbaseES空串处理
  252. }
  253. insertMapList.add(map);
  254. }
  255. return insertMapList;
  256. }
  257. // 不同数据库之间,特殊数据类型与值校验
  258. private Object checkValue(JdbcColumnModel model, String dbType) throws Exception {
  259. Function<String, Boolean> checkVal = (dataType) ->
  260. model.getDataType().equalsIgnoreCase(dataType) && model.getValue() != null;
  261. switch (dbType){
  262. case DbBase.MYSQL:
  263. /* MySQL设置tinyint类型且长度为1时,JDBC读取时会变成BIT类型,java类型为Boolean类型。
  264. 1:true , 0:false */
  265. if(checkVal.apply("BIT")) return String.valueOf(model.getValue());
  266. case DbBase.ORACLE:
  267. if(checkVal.apply("NCLOB")) return String.valueOf(model.getValue());
  268. return FormatSqlOracle.timestamp(model.getValue());
  269. case DbBase.SQL_SERVER:
  270. case DbBase.KINGBASE_ES:
  271. case DbBase.DM:
  272. if(checkVal.apply("CLOB")){
  273. if(model.getValue() instanceof NClobProxyImpl) FormatSqlDM.getClob((NClobProxyImpl)(model.getValue()));
  274. }
  275. case DbBase.POSTGRE_SQL:
  276. // TODO 等待补充
  277. default:
  278. return model.getValue();
  279. }
  280. }
  281. /**
  282. * 【处理字段类型】
  283. */
  284. private DbTableFieldModel convertFileDataType(DbTableFieldModel dbTableFieldModel, Map<String, String> convertRuleMap,
  285. String fromDbEncode, String toDbEncode) throws Exception {
  286. String table = dbTableFieldModel.getTable();
  287. List<DbFieldModel> fields = dbTableFieldModel.getDbFieldModelList();
  288. // 规则Map里的(默认)去除
  289. if(convertRuleMap != null){
  290. convertRuleMap.forEach((key, val) ->{
  291. convertRuleMap.put(key, val.replace(" (默认)", ""));
  292. });
  293. }
  294. for (DbFieldModel field : fields) {
  295. try {
  296. // 设置转换数据类型
  297. field.getDtModelDTO().setConvertTargetDtEnum(DtSyncUtil.getToCovert(fromDbEncode, toDbEncode, field.getDataType(), convertRuleMap));
  298. if(toDbEncode.equals(DbBase.MYSQL)){
  299. FormatSqlMySQL.checkMysqlFieldPrimary(field, table);
  300. }
  301. }catch (DataException d){
  302. System.out.println("表_" + table + ":" + d.getMessage());
  303. DataException dataException = new DataException("目前还未支持数据类型" + toDbEncode + "." + table + "(" + field.getDataType() + ")");
  304. dataException.printStackTrace();
  305. // 类型寻找失败转换成字符串
  306. field.setDataType(DtModelDTO.getStringFixedDt(toDbEncode));
  307. throw dataException;
  308. }catch (Exception e) {
  309. e.printStackTrace();
  310. if(e instanceof DataTypeException){
  311. throw e;
  312. }
  313. log.info(e.getMessage());
  314. }
  315. }
  316. return dbTableFieldModel;
  317. }
  318. }