站酷网页,手机怎样做自己的网站,软件著作权怎么写,公司网站设计广州背景#xff1a;项目需迁库#xff0c;使用navicat的工具数据传输或者转储去执行都会报错#xff0c;因为数据量超大#xff0c;然后尝试通过代码实现#xff0c;先在指定连接下建库#xff0c;然后从源库转储表结构出来#xff0c;到目标库执行。然后在代码去实现同步所…背景项目需迁库使用navicat的工具数据传输或者转储去执行都会报错因为数据量超大然后尝试通过代码实现先在指定连接下建库然后从源库转储表结构出来到目标库执行。然后在代码去实现同步所有这些表的数据能力有限将就看看。整体的逻辑大致是1先搭目标库转储源库表结构去目标库执行使目标库与源库表结构一致2拿到源库所有表名下有sql到时候代码就跑这些表名的表同步数据3代码里同步模式isTotalOrAdd如果是全量total的时候跑那么他一张张表循环跑的时候每次都会先truncate table这张表然后再去新增如果是add增量的话它会去拿这个表的主键然后查这个表大于当前最大主键的数据去插入注意这得保证每个表都得有主键才行我当时跑很多表没主键去AI好像有个办法可以直接查到库里所有没主键的表给他加上来着但那个没存找不见了4存在一张表数据量超大溢出报错所以后有限制分页每次都查1000条。5关于报错日志因为一次性跑所有表控制台日志会被刷跑所以收集了所有报错信息把它存到了本地路径文件里另外某个表同步报错了不会中断它会记录日志接着跑下一张表6有些表其实跑的时候会报错的比如视图、有外键的表之类的或者一些大表也可以直接通过navicat工具去数据传输的可以让他代码跑大部分表自己手动跑少部分的大表节省时间我的代码自己跑的话1小时应该有两百万条大概估的自己用的时候试试先跑一个表看看效果注意备份成功再跑所有。mysql查源库下所有表名会带上视图复制出来在编辑器直接替换成逗号间隔的字符串\r\n---,这个是要同步的表名自己手动跑的记得摘出去-- 查询INFORMATION_SCHEMA系统表 SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA data_new ORDER BY TABLE_NAME;controller代码/** * 同步数据Controller */ RestController RequestMapping(/data/sync) public class DataSyncController extends BaseController { Autowired private DataSyncService dataSyncService; /**【这个就在最初跑 运行时 注意切换数据源、开启从库开关enabled另数据库连接加参在数据源配置中添加允许零日期的参数配置 zeroDateTimeBehaviorconvertToNull】 * return */ GetMapping(/dataSync) public RequestResult dataSync() { return RequestResult.success(dataSyncService.dataSync()); } }service类public interface DataSyncService { int dataSync(); }实现类Service public class DataSyncServiceImpl implements DataSyncService { Autowired private TbDataMapper tbDataMapper; Autowired private TbTsDataMapper tsDataMapper; //这个是库名 使用时有叫不同的库名的他原生的写法都是库名.表名用的表导致库名不一致得改所有为方便所以直接配成能通用的 Value(${spring.custom.datasource.data_schema}) private String DATA_SCHEMA; Override public int dataSync() { //主库实际使用时存在一些表报错的情况比如视图、存在外键的表等这些还是得手动推就是navicat里选中库--工具---数据传输去同步走不了代码 String tableNames test1_table,test2_table; // 同步模式total(全量) 或 add(增量) String isTotalOrAdd total; // String isTotalOrAdd add; ListSyncErrorRecord errorList new ArrayList(); ListString tableNamesList Arrays.asList(tableNames.split(,)); int totalCount 0; for (String tableName : tableNamesList) { try { int tableCount syncSingleTableMaster(isTotalOrAdd, tableName, errorList); totalCount tableCount; } catch (Exception e) { SyncErrorRecord errorRecord new SyncErrorRecord(); errorRecord.setTableName(tableName); errorRecord.setErrorMessage(e.getMessage()); errorRecord.setErrorTime(new Date()); errorRecord.setErrorType(FULL_SYNC_ERROR); errorList.add(errorRecord); System.err.println(表 tableName 同步时发生错误: e.getMessage()); e.printStackTrace(); } } System.out.println(总共同步了 totalCount 条数据); if (CollectionUtils.isNotEmpty(errorList)) { System.out.println(《~~~~~~~~~~~~主库同步过程中发生错误的表~~~~~~~~~~~~》); for (SyncErrorRecord error : errorList) { System.out.println(表名: error.getTableName() , 错误: error.getErrorMessage()); } System.out.println(《############主库同步过程中发生错误的表############》); // 保存错误记录 这里会把日志文件保存到文件里最后跑完看哪些表报错了再手动推就行 saveErrorRecords(errorList); } return totalCount; } /** * 数据同步处理 * param isTotalOrAdd * param tableName * param errorList * return */ private int syncSingleTableMaster(String isTotalOrAdd, String tableName, ListSyncErrorRecord errorList) { // 如果是全量同步清空目标表数据 if (total.equals(isTotalOrAdd)) { tsDataMapper.truncateTableNewdb(tableName); } int totalCount 0; Object maxPrimaryKeyValue null; String primaryKeyColumn null; try { // 获取表的主键字段名 primaryKeyColumn tsDataMapper.getPrimaryKeyColumn(DATA_SCHEMA, tableName); // 如果是增量同步查询目标表中的最大主键值 if (add.equals(isTotalOrAdd)) { maxPrimaryKeyValue tsDataMapper.getMaxPrimaryKeyValueCmg(tableName, primaryKeyColumn); System.out.println(表 tableName 主键字段: primaryKeyColumn , 当前最大值: maxPrimaryKeyValue); } } catch (Exception e) { SyncErrorRecord errorRecord new SyncErrorRecord(); errorRecord.setTableName(tableName); errorRecord.setErrorMessage(查询目标表主键最大值失败: e.getMessage()); errorRecord.setErrorTime(new Date()); errorRecord.setErrorType(PRIMARY_KEY_QUERY_ERROR); errorList.add(errorRecord); return totalCount; } int offset 0; // 每次处理1000条记录 int limit 1000; boolean hasMoreData true; while (hasMoreData) { try { // 分页查询数据根据主键值进行增量查询 ListMapString, Object dataList tbDataMapper.getMasterDataListNewdbPcByPageWithPrimaryKeyCmg( tableName, offset, limit, primaryKeyColumn, maxPrimaryKeyValue); if (CollectionUtils.isNotEmpty(dataList)) { // 获取所有列名 SetString allColumnNames getAllColumnNames(dataList); // 标准化数据结构 ListMapString, Object standardizedDataList standardizeData(dataList, allColumnNames); // 分批插入 int count 0; for (int i 0; i standardizedDataList.size(); i 500) { ListMapString, Object batch standardizedDataList.subList(i, Math.min(i 500, standardizedDataList.size())); SetString batchColumnNames getAllColumnNames(batch); ListMapString, Object finalBatch standardizeData(batch, batchColumnNames); count tsDataMapper.insertBatchNewdbNew(tableName, finalBatch); } totalCount count; offset limit; // 如果返回的数据少于limit说明已经没有更多数据 if (dataList.size() limit) { hasMoreData false; } } else { // 没有查询到数据说明已同步完所有新增数据 hasMoreData false; } } catch (Exception e) { SyncErrorRecord errorRecord new SyncErrorRecord(); errorRecord.setTableName(tableName); errorRecord.setErrorMessage(分页同步失败: e.getMessage()); errorRecord.setErrorTime(new Date()); errorRecord.setErrorType(PAGE_SYNC_ERROR); errorList.add(errorRecord); break; // 出错则跳出循环 } } System.out.println(表 tableName 同步了 totalCount 条数据); return totalCount; } /** * 将错误记录保存到文件或专门的错误表中 * * 收集错误日志示例 * ~~~~~~~~~~~~主库同步过程中发生错误的表~~~~~~~~~~~~》 * 表名: test_table, 错误: 分页同步失败: * ### Error querying database. Cause: java.sql.SQLSyntaxErrorException: SELECT command denied to user data_new1.1.1.1 for table test_table * ### The error may exist in file [D:\app\target\classes\mybatis\data\DataMapper.xml] * ### The error may involve defaultParameterMap * ### The error occurred while setting parameters * ### SQL: SELECT * FROM data_new.test_table ORDER BY LOG_NR_ ASC LIMIT ?, ? * ### Cause: java.sql.SQLSyntaxErrorException: SELECT command denied to user data_new1.1.1.1 for table test_table * ; bad SQL grammar []; nested exception is java.sql.SQLSyntaxErrorException: SELECT command denied to user data_new1.1.1.1 for table test_table * 《############主库同步过程中发生错误的表############》 * 错误记录已保存到文件: C:\Users\Administrator\AppData\Local\Temp\\sync_errors_1766713290763.log * * param errorList */ private void saveErrorRecords(ListSyncErrorRecord errorList) { // 将错误记录保存到文件或专门的错误表中 try { // // 方案1: 指定绝对路径 // String errorLogPath /path/to/logs/sync_errors_ System.currentTimeMillis() .log; // // // 方案2: 使用相对路径保存到项目logs目录下 // String errorLogPath logs/sync_errors_ System.currentTimeMillis() .log; // 方案3: 使用系统临时目录 String tempDir System.getProperty(java.io.tmpdir); String errorLogPath tempDir File.separator sync_errors_ System.currentTimeMillis() .log; // 保存到文件示例 // String errorLog sync_errors_ System.currentTimeMillis() .log; try (FileWriter writer new FileWriter(errorLogPath)) { for (SyncErrorRecord error : errorList) { writer.write(String.format(表名: %s, 错误类型: %s, 错误信息: %s, 时间: %s%n, error.getTableName(), error.getErrorType(), error.getErrorMessage(), error.getErrorTime())); } } System.out.println(错误记录已保存到文件: errorLogPath); } catch (IOException e) { System.err.println(保存错误记录失败: e.getMessage()); } } /** * 获取所有记录中的列名集合 */ private SetString getAllColumnNames(ListMapString, Object dataList) { SetString allColumnNames new LinkedHashSet(); for (MapString, Object row : dataList) { allColumnNames.addAll(row.keySet()); } return allColumnNames; } /** * 标准化数据确保所有行具有相同的列结构 */ private ListMapString, Object standardizeData(ListMapString, Object dataList, SetString allColumnNames) { ListMapString, Object standardizedList new ArrayList(); for (MapString, Object originalRow : dataList) { MapString, Object standardizedRow new LinkedHashMap(); // 确保所有列都存在缺失的列设为null for (String columnName : allColumnNames) { standardizedRow.put(columnName, originalRow.get(columnName)); } standardizedList.add(standardizedRow); } return standardizedList; }错误记录实体类/** * 错误记录实体类 */ Data public class SyncErrorRecord { private String tableName; private String errorMessage; private Date errorTime; // FULL_SYNC_ERROR, INCREMENTAL_SYNC_ERROR private String errorType; }mapper类/** * 目标库 */ Repository public interface TbTsDataMapper { DataSource(DataSourceType.NEWDB) void truncateTableNewdb(Param(tableName) String tableName); DataSource(DataSourceType.NEWDB) String getPrimaryKeyColumn(Param(dataSchema) String dataSchema, Param(tableName) String tableName); DataSource(DataSourceType.NEWDB) Object getMaxPrimaryKeyValueCmg(Param(tableName) String tableName, Param(primaryKeyColumn) String primaryKeyColumn); DataSource(DataSourceType.NEWDB) int insertBatchNewdbNew(Param(tableName) String tableName, Param(dataList) ListMapString, Object dataList); } /** * 源库 */ Repository public interface TbDataMapper { DataSource(DataSourceType.MASTER) ListMapString, Object getMasterDataListNewdbPcByPageWithPrimaryKeyCmg( Param(tableName) String tableName, Param(offset) int offset, Param(limit) int limit, Param(primaryKeyColumn) String primaryKeyColumn, Param(maxPrimaryKeyValue) Object maxPrimaryKeyValue); }目标库的TbTsDataMapper.xml方法update idtruncateTableNewdb TRUNCATE TABLE ${data_schema}.${tableName} /update select idgetPrimaryKeyColumn resultTypejava.lang.String SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE TABLE_SCHEMA #{dataSchema} AND TABLE_NAME #{tableName} AND COLUMN_KEY PRI LIMIT 1 /select select idgetMaxPrimaryKeyValueCmg resultTypejava.lang.Object SELECT MAX(${primaryKeyColumn}) FROM ${data_schema}.${tableName} /select insert idinsertBatchNewdbNew INSERT INTO ${data_schema}.${tableName} trim prefix( suffix) suffixOverrides, foreach collectiondataList[0].keySet() itemkey if testkey ! null and key ! ${key}, /if /foreach /trim VALUES foreach collectiondataList itemdata separator, trim prefix( suffix) suffixOverrides, foreach collectiondataList[0].keySet() itemkey if testkey ! null and key ! choose when testdata[key] null NULL, /when otherwise #{data.${key}}, /otherwise /choose /if /foreach /trim /foreach /insert源库的TbDataMapper.xml方法select idgetMasterDataListNewdbPcByPageWithPrimaryKeyCmg resultTypejava.util.Map SELECT * FROM ${tableName} where if testmaxPrimaryKeyValue ! null ${primaryKeyColumn} #{maxPrimaryKeyValue} /if /where ORDER BY ${primaryKeyColumn} ASC LIMIT #{offset}, #{limit} /select部分application-test.yml内容spring: datasource: type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.mysql.cj.jdbc.Driver druid: # 主库数据源 master: url: jdbc:mysql://x.x.x.x:xxxx/data_old?useUnicodetruecharacterEncodingutf8useSSLfalseserverTimezoneAsia/Shanghai username: masterusername password: masterpassword # 从库数据源 slave: # 从数据源开关/默认关闭 enabled: true url: jdbc:mysql://xx.xx.xx.xx:xxxx/test?useUnicodetruecharacterEncodingutf8 username: slaveusername password: slavepassword driverClassName: com.mysql.jdbc.Driver # 新库数据源 newdb: # 从数据源开关/默认关闭 enabled: true url: jdbc:mysql://xxx.xxx.xxx.xxx:xxx/data_new?useUnicodetruecharacterEncodingutf8 username: newdbusername password: newdbpassword driverClassName: com.mysql.jdbc.Driver # 初始连接数 initialSize: 5 # 最小连接池数量 minIdle: 10 # 最大连接池数量 maxActive: 20 # 配置获取连接等待超时的时间 maxWait: 60000 # 配置间隔多久才进行一次检测检测需要关闭的空闲连接单位是毫秒 timeBetweenEvictionRunsMillis: 60000 # 配置一个连接在池中最小生存的时间单位是毫秒 minEvictableIdleTimeMillis: 300000 # 配置一个连接在池中最大生存的时间单位是毫秒 maxEvictableIdleTimeMillis: 900000 # 配置检测连接是否有效 validationQuery: SELECT 1 FROM DUAL testWhileIdle: true testOnBorrow: false testOnReturn: false webStatFilter: enabled: true statViewServlet: enabled: true # 设置白名单不填则允许所有访问 allow: url-pattern: /druid/* # 控制台管理用户名和密码 login-username: login-password: filter: stat: enabled: true # 慢SQL记录 log-slow-sql: true slow-sql-millis: 1000 merge-sql: true wall: config: multi-statement-allow: true # 自定义数据库 schema 配置 custom: datasource: # 新库 data_schema: data_new如果要支持使用配置的库名还得在MyBatisConfig类加data_schema的配置/** * Mybatis支持*匹配扫描包 * * author admin */ //Configuration Component public class MyBatisConfig { Autowired private Environment env; static final String DEFAULT_RESOURCE_PATTERN **/*.class; private final static String COMMA ,; /** * 20250804加 处理正式、测试 数据库名不一致问题 */ Value(${spring.custom.datasource.data_schema}) private String DATA_SCHEMA; Bean public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception { String typeAliasesPackage env.getProperty(mybatis.typeAliasesPackage); String mapperLocations env.getProperty(mybatis.mapperLocations); String configLocation env.getProperty(mybatis.configLocation); typeAliasesPackage setTypeAliasesPackage(typeAliasesPackage); VFS.addImplClass(SpringBootVFS.class); /* 【2025.10.09发现 mybatis和mybatis-plus同时使用时 无法使用BaseMapper的公用方法参考https://blog.csdn.net/m0_67402125/article/details/123991284?utm_mediumdistribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-123991284-blog-147252964.235^v43^pc_blog_bottom_relevance_base1spm1001.2101.3001.4242.1utm_relevant_index3】 解决方法在配置数据源的地方换一个类即可 */ // final SqlSessionFactoryBean sessionFactory new SqlSessionFactoryBean(); MybatisSqlSessionFactoryBean sessionFactorynew MybatisSqlSessionFactoryBean(); sessionFactory.setDataSource(dataSource); //20250804加 处理正式、测试 数据库名不一致问题 // 设置MyBatis属性 Properties properties new Properties(); properties.setProperty(cm_mg_schema, CM_MG_SCHEMA); sessionFactory.setConfigurationProperties(properties); sessionFactory.setTypeAliasesPackage(typeAliasesPackage); sessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations)); sessionFactory.setConfigLocation(new DefaultResourceLoader().getResource(configLocation)); return sessionFactory.getObject(); } public static String setTypeAliasesPackage(String typeAliasesPackage) { ResourcePatternResolver resolver (ResourcePatternResolver) new PathMatchingResourcePatternResolver(); MetadataReaderFactory metadataReaderFactory new CachingMetadataReaderFactory(resolver); ListString allResult new ArrayListString(); try { for (String aliasesPackage : typeAliasesPackage.split(COMMA)) { ListString result new ArrayListString(); aliasesPackage ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX ClassUtils.convertClassNameToResourcePath(aliasesPackage.trim()) / DEFAULT_RESOURCE_PATTERN; Resource[] resources resolver.getResources(aliasesPackage); if (resources ! null resources.length 0) { MetadataReader metadataReader null; for (Resource resource : resources) { if (resource.isReadable()) { metadataReader metadataReaderFactory.getMetadataReader(resource); try { result.add(Class.forName(metadataReader.getClassMetadata().getClassName()).getPackage().getName()); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } } if (result.size() 0) { HashSetString hashResult new HashSetString(result); allResult.addAll(hashResult); } } if (allResult.size() 0) { typeAliasesPackage String.join(,, (String[]) allResult.toArray(new String[0])); } else { throw new RuntimeException(mybatis typeAliasesPackage 路径扫描错误,参数typeAliasesPackage: typeAliasesPackage 未找到任何包); } } catch (IOException e) { e.printStackTrace(); } return typeAliasesPackage; } }