0 Star 0 Fork 0

浪子花梦 / 技术文章收录

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
shardingsphere-jdbc 读写分离及表拆分.md 23.79 KB
一键复制 编辑 原始数据 按行查看 历史

pom.xml

<dependency>
	   <groupId>org.apache.shardingsphere</groupId>
	   <artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
	   <version>5.1.1</version>
</dependency>

application.yml


spring:
  shardingsphere:
    mode:
      type: Memory
    datasource:
      names: master,slave
      master:
        url: jdbc:p6spy:mysql://127.0.0.1:15101/admin?allowPublicKeyRetrieval=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&rewriteBatchedStatements=true
        username: root
        password: 123456
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.p6spy.engine.spy.P6SpyDriver
        hikari:
          auto-commit: false
          connection-timeout: 30000
          idle-timeout: 25000
          login-timeout: 5
          max-lifetime: 30000
          read-only: false
          validation-timeout: 3000
          maximum-pool-size: 50
          minimum-idle: 10
          data-source-properties:
            cachePrepStmts: true
            prepStmtCacheSize: 250
            prepStmtCacheSqlLimit: 2048
            useServerPrepStmts: true
            useLocalSessionState: true
            rewriteBatchedStatements: true
            cacheResultSetMetadata: true
            cacheServerConfiguration: true
            elideSetAutoCommits: true
            maintainTimeStats: false
      slave:
        url: jdbc:p6spy:mysql://127.0.0.1:15102/admin?allowPublicKeyRetrieval=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false&rewriteBatchedStatements=true
        username: root
        password: 123456
        type: com.zaxxer.hikari.HikariDataSource
        driver-class-name: com.p6spy.engine.spy.P6SpyDriver
        hikari:
          auto-commit: false
          connection-timeout: 30000
          idle-timeout: 25000
          login-timeout: 5
          max-lifetime: 30000
          read-only: true
          validation-timeout: 3000
          maximum-pool-size: 50
          minimum-idle: 10
          data-source-properties:
            cachePrepStmts: true
            prepStmtCacheSize: 250
            prepStmtCacheSqlLimit: 2048
            useServerPrepStmts: true
            useLocalSessionState: true
            rewriteBatchedStatements: true
            cacheResultSetMetadata: true
            cacheServerConfiguration: true
            elideSetAutoCommits: true
            maintainTimeStats: false
    rules:
      readwrite-splitting:
        data-sources:
          ds:
            type: STATIC
            props:
              # 主库
              write-data-source-name: master
              # 从库
              read-data-source-names: slave

      sharding:
        # 表策略配置
        tables:
          # t_user 是逻辑表
          t_user:
            # 配置数据节点这里是按月分表
            # 时间范围设置在202201 ~ 210012
            actualDataNodes: master.t_user_$->{2023..2100}0$->{1..9},master.t_user_$->{2023..2100}1$->{0..2}
            tableStrategy:
              # 使用标准分片策略
              standard:
                # 配置分片字段
                shardingColumn: create_time
                # 分片算法名称不支持大写字母和下划线否则启动就会报错
                shardingAlgorithmName: time-sharding-altorithm
        # 分片算法配置
        shardingAlgorithms:
          # 分片算法名称不支持大写字母和下划线否则启动就会报错
          time-sharding-altorithm:
            # 类型自定义策略
            type: CLASS_BASED
            props:
              # 分片策略
              strategy: standard
              # 分片算法类
              algorithmClassName: com.ithuameng.admin.sharding.TimeShardingAlgorithm

工具类

ShardingTableCacheEnum -- 分片表缓存枚举

/**
 * 分片表缓存枚举
 *
 * @author ithuameng
 */
public enum ShardingTableCacheEnum {

    /**
     * 用户表
     */
    USER("t_user", new HashSet<>());

    /**
     * 逻辑表名
     */
    private final String logicTableName;
    /**
     * 实际表名
     */
    private final Set<String> resultTableNamesCache;

    private static Map<String, ShardingTableCacheEnum> valueMap = new HashMap<>();

    static {
        Arrays.stream(ShardingTableCacheEnum.values()).forEach(o -> valueMap.put(o.logicTableName, o));
    }

    ShardingTableCacheEnum(String logicTableName, Set<String> resultTableNamesCache) {
        this.logicTableName = logicTableName;
        this.resultTableNamesCache = resultTableNamesCache;
    }

    public static ShardingTableCacheEnum of(String value) {
        return valueMap.get(value);
    }

    public String logicTableName() {
        return logicTableName;
    }

    public Set<String> resultTableNamesCache() {
        return resultTableNamesCache;
    }

    public static Set<String> logicTableNames() {
        return valueMap.keySet();
    }

    @Override
    public String toString() {
        return "ShardingTableCacheEnum{" +
                "logicTableName='" + logicTableName + '\'' +
                ", resultTableNamesCache=" + resultTableNamesCache +
                '}';
    }
}

ShardingAlgorithmTool -- 按月分片算法工具

/**
 * 按月分片算法工具
 *
 * @author ithuameng
 */
@Slf4j
public class ShardingAlgorithmTool {

    /**
     * 表分片符号,例:t_contract_202201 中,分片符号为 "_"
     */
    private static final String TABLE_SPLIT_SYMBOL = "_";

    /**
     * 数据库配置
     */
    private static final Environment ENV = SpringUtils.getApplicationContext().getEnvironment();
    private static final String DATASOURCE_URL = ENV.getProperty("spring.shardingsphere.datasource.master.url");
    private static final String DATASOURCE_USERNAME = ENV.getProperty("spring.shardingsphere.datasource.master.username");
    private static final String DATASOURCE_PASSWORD = ENV.getProperty("spring.shardingsphere.datasource.master.password");


    /**
     * 检查分表获取的表名是否存在,不存在则自动建表
     *
     * @param logicTable       逻辑表
     * @param resultTableNames 真实表名,例:t_contract_202201
     * @return 存在于数据库中的真实表名集合
     */
    public static Set<String> getShardingTablesAndCreate(ShardingTableCacheEnum logicTable, Collection<String> resultTableNames) {
        return resultTableNames.stream().map(o -> getShardingTableAndCreate(logicTable, o)).collect(Collectors.toSet());
    }

    /**
     * 检查分表获取的表名是否存在,不存在则自动建表
     *
     * @param logicTable      逻辑表
     * @param resultTableName 真实表名,例:t_contract_202201
     * @return 确认存在于数据库中的真实表名
     */
    public static String getShardingTableAndCreate(ShardingTableCacheEnum logicTable, String resultTableName) {
        // 缓存中有此表则返回,没有则判断创建
        if (logicTable.resultTableNamesCache().contains(resultTableName)) {
            return resultTableName;
        } else {
            // 未创建的表返回逻辑空表
            boolean isSuccess = createShardingTable(logicTable, resultTableName);
            return isSuccess ? resultTableName : logicTable.logicTableName();
        }
    }

    /**
     * 重载全部缓存
     */
    public static void tableNameCacheReloadAll() {
        Arrays.stream(ShardingTableCacheEnum.values()).forEach(ShardingAlgorithmTool::tableNameCacheReload);
    }

    /**
     * 重载指定分表缓存
     *
     * @param logicTable 逻辑表,例:t_contract
     */
    public static void tableNameCacheReload(ShardingTableCacheEnum logicTable) {
        // 读取数据库中|所有表名
        List<String> tableNameList = getAllTableNameBySchema(logicTable);
        // 删除旧的缓存(如果存在)
        logicTable.resultTableNamesCache().clear();
        // 写入新的缓存
        logicTable.resultTableNamesCache().addAll(tableNameList);
        // 动态更新配置 actualDataNodes
        actualDataNodesRefresh(logicTable);
    }

    /**
     * 获取所有表名
     *
     * @param logicTable 逻辑表
     * @return 表名集合
     */
    public static List<String> getAllTableNameBySchema(ShardingTableCacheEnum logicTable) {
        List<String> tableNames = new ArrayList<>();
        if (StringUtils.isEmpty(DATASOURCE_URL) || StringUtils.isEmpty(DATASOURCE_USERNAME) || StringUtils.isEmpty(DATASOURCE_PASSWORD)) {
            log.error(">>>>>>>>>> 【ERROR】数据库连接配置有误,请稍后重试,URL:{}, username:{}, password:{}", DATASOURCE_URL, DATASOURCE_USERNAME, DATASOURCE_PASSWORD);
            throw new IllegalArgumentException("数据库连接配置有误,请稍后重试");
        }
        try (Connection conn = DriverManager.getConnection(DATASOURCE_URL, DATASOURCE_USERNAME, DATASOURCE_PASSWORD);
             Statement st = conn.createStatement()) {
            String logicTableName = logicTable.logicTableName();
            try (ResultSet rs = st.executeQuery("show TABLES like '" + logicTableName + TABLE_SPLIT_SYMBOL + "%'")) {
                while (rs.next()) {
                    String tableName = rs.getString(1);
                    // 匹配分表格式 例:^(t\_contract_\d{6})$
                    if (tableName != null && tableName.matches(String.format("^(%s\\d{6})$", logicTableName + TABLE_SPLIT_SYMBOL))) {
                        tableNames.add(rs.getString(1));
                    }
                }
            }
        } catch (SQLException e) {
            log.error(">>>>>>>>>> 【ERROR】数据库连接失败,请稍后重试,原因:{}", e.getMessage(), e);
            throw new IllegalArgumentException("数据库连接失败,请稍后重试");
        }
        return tableNames;
    }

    /**
     * 动态更新配置 actualDataNodes
     *
     * @param logicTable
     */
    public static void actualDataNodesRefresh(ShardingTableCacheEnum logicTable) {
        try {
            // 获取数据分片节点
            String dbName = "master";
            String logicTableName = logicTable.logicTableName();
            Set<String> tableNamesCache = logicTable.resultTableNamesCache();
            log.info(">>>>>>>>>> 【INFO】更新分表配置,logicTableName:{},tableNamesCache:{}", logicTableName, tableNamesCache);

            // generate actualDataNodes
            String newActualDataNodes = tableNamesCache.stream().map(o -> String.format("%s.%s", dbName, o)).collect(Collectors.joining(","));
            ShardingSphereDataSource shardingSphereDataSource = SpringUtils.getBean(ShardingSphereDataSource.class);
            updateShardRuleActualDataNodes(shardingSphereDataSource, logicTableName, newActualDataNodes);
        } catch (Exception e) {
            log.error("初始化 动态表单失败,原因:{}", e.getMessage(), e);
        }
    }

    /**
     * 刷新ActualDataNodes
     */
    private static void updateShardRuleActualDataNodes(ShardingSphereDataSource dataSource, String logicTableName, String newActualDataNodes) {
        // Context manager.
        ContextManager contextManager = dataSource.getContextManager();

        // Rule configuration.
        String schemaName = "logic_db";
        Collection<RuleConfiguration> newRuleConfigList = new LinkedList<>();
        Collection<RuleConfiguration> oldRuleConfigList = dataSource.getContextManager()
                .getMetaDataContexts()
                .getMetaData(schemaName)
                .getRuleMetaData()
                .getConfigurations();

        for (RuleConfiguration oldRuleConfig : oldRuleConfigList) {
            if (oldRuleConfig instanceof AlgorithmProvidedShardingRuleConfiguration) {

                // Algorithm provided sharding rule configuration
                AlgorithmProvidedShardingRuleConfiguration oldAlgorithmConfig = (AlgorithmProvidedShardingRuleConfiguration) oldRuleConfig;
                AlgorithmProvidedShardingRuleConfiguration newAlgorithmConfig = new AlgorithmProvidedShardingRuleConfiguration();

                // Sharding table rule configuration Collection
                Collection<ShardingTableRuleConfiguration> newTableRuleConfigList = new LinkedList<>();
                Collection<ShardingTableRuleConfiguration> oldTableRuleConfigList = oldAlgorithmConfig.getTables();

                oldTableRuleConfigList.forEach(oldTableRuleConfig -> {
                    if (logicTableName.equals(oldTableRuleConfig.getLogicTable())) {
                        ShardingTableRuleConfiguration newTableRuleConfig = new ShardingTableRuleConfiguration(oldTableRuleConfig.getLogicTable(), newActualDataNodes);
                        newTableRuleConfig.setTableShardingStrategy(oldTableRuleConfig.getTableShardingStrategy());
                        newTableRuleConfig.setDatabaseShardingStrategy(oldTableRuleConfig.getDatabaseShardingStrategy());
                        newTableRuleConfig.setKeyGenerateStrategy(oldTableRuleConfig.getKeyGenerateStrategy());

                        newTableRuleConfigList.add(newTableRuleConfig);
                    } else {
                        newTableRuleConfigList.add(oldTableRuleConfig);
                    }
                });

                newAlgorithmConfig.setTables(newTableRuleConfigList);
                newAlgorithmConfig.setAutoTables(oldAlgorithmConfig.getAutoTables());
                newAlgorithmConfig.setBindingTableGroups(oldAlgorithmConfig.getBindingTableGroups());
                newAlgorithmConfig.setBroadcastTables(oldAlgorithmConfig.getBroadcastTables());
                newAlgorithmConfig.setDefaultDatabaseShardingStrategy(oldAlgorithmConfig.getDefaultDatabaseShardingStrategy());
                newAlgorithmConfig.setDefaultTableShardingStrategy(oldAlgorithmConfig.getDefaultTableShardingStrategy());
                newAlgorithmConfig.setDefaultKeyGenerateStrategy(oldAlgorithmConfig.getDefaultKeyGenerateStrategy());
                newAlgorithmConfig.setDefaultShardingColumn(oldAlgorithmConfig.getDefaultShardingColumn());
                newAlgorithmConfig.setShardingAlgorithms(oldAlgorithmConfig.getShardingAlgorithms());
                newAlgorithmConfig.setKeyGenerators(oldAlgorithmConfig.getKeyGenerators());

                newRuleConfigList.add(newAlgorithmConfig);
            }
        }

        // update context
        contextManager.alterRuleConfiguration(schemaName, newRuleConfigList);
    }

    /**
     * 创建分表
     *
     * @param logicTable      逻辑表
     * @param resultTableName 真实表名,例:t_contract_202201
     * @return 创建结果(true创建成功,false未创建)
     */
    private static boolean createShardingTable(ShardingTableCacheEnum logicTable, String resultTableName) {
        // 根据日期判断,当前月份之后分表不提前创建
        String month = resultTableName.replace(logicTable.logicTableName() + TABLE_SPLIT_SYMBOL, "");
        YearMonth shardingMonth = YearMonth.parse(month, DateTimeFormatter.ofPattern("yyyyMM"));
        if (shardingMonth.isAfter(YearMonth.now())) {
            return false;
        }

        synchronized (logicTable.logicTableName().intern()) {
            // 缓存中有此表 返回
            if (logicTable.resultTableNamesCache().contains(resultTableName)) {
                return false;
            }
            // 缓存中无此表,则建表并添加缓存
            executeSql(Collections.singletonList("CREATE TABLE IF NOT EXISTS `" + resultTableName + "` LIKE `" + logicTable.logicTableName() + "`;"));
            // 缓存重载
            tableNameCacheReload(logicTable);
        }
        return true;
    }

    /**
     * 执行SQL
     *
     * @param sqlList SQL集合
     */
    private static void executeSql(List<String> sqlList) {
        if (StringUtils.isEmpty(DATASOURCE_URL) || StringUtils.isEmpty(DATASOURCE_USERNAME) || StringUtils.isEmpty(DATASOURCE_PASSWORD)) {
            log.error(">>>>>>>>>> 【ERROR】数据库连接配置有误,请稍后重试,URL:{}, username:{}, password:{}", DATASOURCE_URL, DATASOURCE_USERNAME, DATASOURCE_PASSWORD);
            throw new IllegalArgumentException("数据库连接配置有误,请稍后重试");
        }
        try (Connection conn = DriverManager.getConnection(DATASOURCE_URL, DATASOURCE_USERNAME, DATASOURCE_PASSWORD)) {
            try (Statement st = conn.createStatement()) {
                conn.setAutoCommit(false);
                for (String sql : sqlList) {
                    st.execute(sql);
                }
            } catch (Exception e) {
                conn.rollback();
                log.error(">>>>>>>>>> 【ERROR】数据表创建执行失败,请稍后重试,原因:{}", e.getMessage(), e);
                throw new IllegalArgumentException("数据表创建执行失败,请稍后重试");
            }
        } catch (SQLException e) {
            log.error(">>>>>>>>>> 【ERROR】数据库连接失败,请稍后重试,原因:{}", e.getMessage(), e);
            throw new IllegalArgumentException("数据库连接失败,请稍后重试");
        }
    }

}

ShardingTablesLoadRunner -- 项目启动后,读取已有分表,进行缓存

/**
 * 项目启动后,读取已有分表,进行缓存
 *
 * @author ithuameng
 */
@Order(value = 1) // 数字越小,越先执行
@Component
public class ShardingTablesLoadRunner implements CommandLineRunner {

    @Override
    public void run(String... args) {
        // 读取已有分表,进行缓存
        ShardingAlgorithmTool.tableNameCacheReloadAll();
    }
}

TimeShardingAlgorithm -- 分片算法,按月分片

/**
 * 分片算法,按月分片
 *
 * @author ithuameng
 */
@Slf4j
public class TimeShardingAlgorithm implements StandardShardingAlgorithm<LocalDateTime> {

    /**
     * 分片时间格式
     */
    private static final DateTimeFormatter TABLE_SHARD_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMM");

    /**
     * 完整时间格式
     */
    private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss");

    /**
     * 表分片符号,例:t_contract_202201 中,分片符号为 "_"
     */
    private final String TABLE_SPLIT_SYMBOL = "_";


    /**
     * 精准分片
     *
     * @param tableNames           对应分片库中所有分片表的集合
     * @param preciseShardingValue 分片键值,其中 logicTableName 为逻辑表,columnName 分片键,value 为从 SQL 中解析出来的分片键的值
     * @return 表名
     */
    @Override
    public String doSharding(Collection<String> tableNames, PreciseShardingValue<LocalDateTime> preciseShardingValue) {
        String logicTableName = preciseShardingValue.getLogicTableName();
        ShardingTableCacheEnum logicTable = ShardingTableCacheEnum.of(logicTableName);
        if (logicTable == null) {
            log.error(">>>>>>>>>> 【ERROR】数据表类型错误,请稍后重试,logicTableNames:{},logicTableName:{}",
                    ShardingTableCacheEnum.logicTableNames(), logicTableName);
            throw new IllegalArgumentException("数据表类型错误,请稍后重试");
        }

        /// 打印分片信息
        log.info(">>>>>>>>>> 【INFO】精确分片,节点配置表名:{},数据库缓存表名:{}", tableNames, logicTable.resultTableNamesCache());

        LocalDateTime dateTime = preciseShardingValue.getValue();
        String resultTableName = logicTableName + "_" + dateTime.format(TABLE_SHARD_TIME_FORMATTER);
        // 检查分表获取的表名是否存在,不存在则自动建表
        if (!tableNames.contains(resultTableName)) {
            tableNames.add(resultTableName);
        }
        return ShardingAlgorithmTool.getShardingTableAndCreate(logicTable, resultTableName);
    }

    /**
     * 范围分片
     *
     * @param tableNames         对应分片库中所有分片表的集合
     * @param rangeShardingValue 分片范围
     * @return 表名集合
     */
    @Override
    public Collection<String> doSharding(Collection<String> tableNames, RangeShardingValue<LocalDateTime> rangeShardingValue) {
        String logicTableName = rangeShardingValue.getLogicTableName();
        ShardingTableCacheEnum logicTable = ShardingTableCacheEnum.of(logicTableName);
        if (logicTable == null) {
            log.error(">>>>>>>>>> 【ERROR】逻辑表范围异常,请稍后重试,logicTableNames:{},logicTableName:{}",
                    ShardingTableCacheEnum.logicTableNames(), logicTableName);
            throw new IllegalArgumentException("逻辑表范围异常,请稍后重试");
        }

        /// 打印分片信息
        log.info(">>>>>>>>>> 【INFO】范围分片,节点配置表名:{},数据库缓存表名:{}", tableNames, logicTable.resultTableNamesCache());

        // between and 的起始值
        Range<LocalDateTime> valueRange = rangeShardingValue.getValueRange();
        boolean hasLowerBound = valueRange.hasLowerBound();
        boolean hasUpperBound = valueRange.hasUpperBound();

        // 获取最大值和最小值
        Set<String> tableNameCache = logicTable.resultTableNamesCache();
        LocalDateTime min = hasLowerBound ? valueRange.lowerEndpoint() : getLowerEndpoint(tableNameCache);
        LocalDateTime max = hasUpperBound ? valueRange.upperEndpoint() : getUpperEndpoint(tableNameCache);

        // 循环计算分表范围
        Set<String> resultTableNames = new LinkedHashSet<>();
        while (min.isBefore(max) || min.equals(max)) {
            String tableName = logicTableName + TABLE_SPLIT_SYMBOL + min.format(TABLE_SHARD_TIME_FORMATTER);
            resultTableNames.add(tableName);
            min = min.plusMinutes(1);
        }
        return ShardingAlgorithmTool.getShardingTablesAndCreate(logicTable, resultTableNames);
    }

    @Override
    public void init() {

    }

    @Override
    public String getType() {
        return null;
    }

    /**
     * 获取 最小分片值
     *
     * @param tableNames 表名集合
     * @return 最小分片值
     */
    private LocalDateTime getLowerEndpoint(Collection<String> tableNames) {
        Optional<LocalDateTime> optional = tableNames.stream()
                .map(o -> LocalDateTime.parse(o.replace(TABLE_SPLIT_SYMBOL, "") + "01 00:00:00", DATE_TIME_FORMATTER))
                .min(Comparator.comparing(Function.identity()));
        if (optional.isPresent()) {
            return optional.get();
        } else {
            log.error(">>>>>>>>>> 【ERROR】获取数据最小分表失败,请稍后重试,tableName:{}", tableNames);
            throw new IllegalArgumentException("获取数据最小分表失败,请稍后重试");
        }
    }

    /**
     * 获取 最大分片值
     *
     * @param tableNames 表名集合
     * @return 最大分片值
     */
    private LocalDateTime getUpperEndpoint(Collection<String> tableNames) {
        Optional<LocalDateTime> optional = tableNames.stream()
                .map(o -> LocalDateTime.parse(o.replace(TABLE_SPLIT_SYMBOL, "") + "01 00:00:00", DATE_TIME_FORMATTER))
                .max(Comparator.comparing(Function.identity()));
        if (optional.isPresent()) {
            return optional.get();
        } else {
            log.error(">>>>>>>>>> 【ERROR】获取数据最大分表失败,请稍后重试,tableName:{}", tableNames);
            throw new IllegalArgumentException("获取数据最大分表失败,请稍后重试");
        }
    }
}
1
https://gitee.com/ithuameng/blog.git
git@gitee.com:ithuameng/blog.git
ithuameng
blog
技术文章收录
master

搜索帮助