• SpringBoot与Calcite整合,实现多数据源统一查询系统

SpringBoot与Calcite整合,实现多数据源统一查询系统

2025-04-27 10:40:42 栏目:宝塔面板 463 阅读

最近,接到一个电商系统的兼职小单,其中订单信息存储在MySQL数据库,而用户信息存储在PostgreSQL数据库。客户那边想有一个统一查询接口,可以通过SQL查询同时获取这两个数据源的信息。

为什么选择Apache Calcite?

简化开发流程

  • 抽象层次高: Apache Calcite 提供了高层次的抽象,使得开发者可以专注于业务逻辑,而不必处理底层的数据库连接和查询执行细节。
  • 减少重复工作: 通过使用Calcite,可以避免重复造轮子,节省开发时间和成本。

强大的SQL解析和优化能力

  • SQL标准支持: Apache Calcite 支持多种SQL方言(如MySQL、PostgreSQL等),可以无缝地处理不同数据库的SQL语句。
  • 查询优化: 内置的查询优化器可以根据不同的数据源特性进行智能优化,提高查询性能。

灵活性和可扩展性

  • 自定义模式和表: 可以通过编程方式动态地添加和管理多个数据源,每个数据源可以有不同的模式和表结构。
  • 插件机制: 支持各种插件,可以根据需求灵活扩展功能,例如自定义函数、聚合操作等。

高性能

  • 内存计算: Apache Calcite 支持内存中的数据处理,减少了I/O开销,提高了查询速度。
  • 分布式计算: 虽然本项目主要关注单机版实现,但Apache Calcite也可以扩展到分布式环境中,支持大规模数据集的处理。

集成性强

  • 与其他工具集成: 支持与其他大数据工具和技术栈(如Apache Flink、Presto等)集成,形成完整的数据分析解决方案。

哪些公司使用了Apache Calcite?

  • Google 在其内部的一些数据处理系统中使用 Apache Calcite,特别是在需要高性能和灵活性的场景下。
  • IBM 在其数据仓库和分析解决方案中使用 Apache Calcite,以提高查询性能和灵活性。
  • Intel 使用 Apache Calcite 来支持其大数据分析工具和解决方案,特别是在内存计算方面。
  • Alibaba Cloud: 阿里巴巴云在其大数据平台中使用 Apache Calcite 提供强大的查询优化和执行能力。
  • MaxCompute (ODPS): 阿里巴巴的大规模数据计算服务 MaxCompute 使用 Calcite 进行 SQL 查询处理。
  • Elasticsearch 的某些高级功能,如 Kibana 中的复杂查询,依赖于 Apache Calcite 进行 SQL 解析和优化。
  • Netflix 使用 Apache Calcite 来构建其内部的数据虚拟化层,支持复杂的查询和数据分析需求。
  • Microsoft 在其一些大数据产品和服务中使用 Apache Calcite,例如 Azure Synapse Analytics。
  • Teradata 使用 Apache Calcite 来增强其数据库系统的查询优化和执行性能。
  • Uber 使用 Apache Calcite 来处理其庞大的数据集,并支持复杂的查询和数据分析需求。

应用场景

数据虚拟化

  • 虚拟数据层: 创建一个虚拟的数据层,将分散在不同系统中的数据集中起来,提供统一的视图。
  • 动态数据源管理: 动态地添加和管理数据源,支持灵活的数据架构设计。

商业智能 (BI) 工具

  • 报表生成: 作为 BI 工具的核心组件,支持复杂的报表生成和数据分析。
  • 自助服务分析: 提供自助服务分析功能,允许非技术人员进行数据探索和分析。

机器学习与人工智能

  • 特征工程: 在机器学习管道中使用 Calcite 进行特征提取和数据准备。
  • 模型训练: 结合其他 AI 框架,利用 Calcite 进行大规模数据集的查询和处理。

多数据源查询

  • 统一接口访问多个数据库: 允许用户通过单一接口查询存储在不同数据库(如 MySQL、PostgreSQL、Oracle 等)中的数据。
  • 联合查询: 支持跨数据源的复杂 SQL 查询,例如从不同的数据库中获取相关联的数据。

大数据平台集成

  • 与 Hadoop 生态系统集成: 与 Hive、HBase、Druid 等大数据工具结合,提供统一的查询接口。
  • 流处理与批处理: 支持 Apache Flink 和 Apache Beam 等流处理框架,实现实时数据分析。

嵌入式数据库

  • 轻量级数据库引擎: 提供一个轻量级的 SQL 引擎,适用于嵌入式应用程序和内存数据库。
  • 内存计算: 利用内存计算加速查询性能,适合需要快速响应的应用场景。

数据湖解决方案

  • 统一元数据管理: 提供统一的元数据管理和查询接口,方便数据湖的建设和维护。
  • 多样化数据格式支持: 支持多种数据格式(如 JSON、Parquet、ORC 等),满足不同类型的数据存储需求。

代码实操


    
    
        org.springframework.boot
        spring-boot-starter-web
    

    
    
        org.apache.calcite
        calcite-core
        1.32.0
    

    
    
        com.zaxxer
        HikariCP
    

    
    
        mysql
        mysql-connector-java
        runtime
    

    
    
        org.postgresql
        postgresql
        runtime
    

    
    
        org.springframework.boot
        spring-boot-starter-test
        test
    

application.yml

spring:
  datasource:
    order-db:
      url:jdbc:mysql://localhost:3306/order_db?useSSL=false&serverTimezone=UTC
      username:root
      password:root
      driver-class-name:com.mysql.cj.jdbc.Driver
    user-db:
      url:jdbc:postgresql://localhost:5432/user_db
      username:postgres
      password:postgres
      driver-class-name:org.postgresql.Driver

jpa:
    show-sql:true
    hibernate:
      ddl-auto:update
    properties:
      hibernate:
        dialect:org.hibernate.dialect.MySQL8Dialect

数据源配置

package com.example.multids.config;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

@Configuration
publicclass DataSourceConfig {

    @Bean(name = "mysqlDataSource")
    public DataSource mysqlDataSource() {
        // 配置MySQL数据源
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://localhost:3306/order_db?useSSL=false&serverTimezone=UTC");
        config.setUsername("root");
        config.setPassword("root");
        returnnew HikariDataSource(config);
    }

    @Bean(name = "postgresDataSource")
    public DataSource postgresDataSource() {
        // 配置PostgreSQL数据源
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:postgresql://localhost:5432/user_db");
        config.setUsername("postgres");
        config.setPassword("postgres");
        returnnew HikariDataSource(config);
    }
}

自定义数据源工厂

package com.example.multids.factory;

import com.example.multids.schema.MySchemas;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.adapter.java.ReflectiveSchema;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

publicclass DataSourceFactory {

    public static CalciteConnection createConnection(DataSource mysqlDataSource, DataSource postgresDataSource) throws SQLException {
        // 定义Calcite模型JSON字符串
        String modelJson = "{
" +
                "  "version": "1.0",
" +
                "  "defaultSchema": "my_schemas",
" +
                "  "schemas": [
" +
                "    {
" +
                "      "name": "my_schemas",
" +
                "      "type": "custom",
" +
                "      "factory": "" + ReflectiveSchema.Factory.class.getName() + "",
" +
                "      "operand": {
" +
                "        "class": "" + MySchemas.class.getName() + ""
" +
                "      }
" +
                "    }
" +
                "  ]
" +
                "}";
        
        // 创建Calcite连接
        Connection connection = DriverManager.getConnection("jdbc:calcite:model=" + modelJson);
        CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);

        // 获取根模式并添加子模式
        SchemaPlus schema = calciteConnection.getRootSchema().getSubSchema("my_schemas");
        schema.add("orders", JdbcSchema.create(calciteConnection.getRootSchema(), "orders", mysqlDataSource, null, Lex.MYSQL));
        schema.add("users", JdbcSchema.create(calciteConnection.getRootSchema(), "users", postgresDataSource, null, Lex.POSTGRESQL));

        return calciteConnection;
    }
}

自定义模式

package com.example.multids.schema;

import org.apache.calcite.schema.impl.AbstractSchema;

import java.util.Map;

public class MySchemas extends AbstractSchema {
    @Override
    protected Map getTableMap() {
        // 返回表映射,这里不需要额外处理
        return super.getTableMap();
    }
}

查询控制器

package com.example.multids.controller;

import com.example.multids.factory.DataSourceFactory;
import org.apache.calcite.jdbc.CalciteConnection;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;

@RestController
publicclass QueryController {

    privatefinal DataSource mysqlDataSource;
    privatefinal DataSource postgresDataSource;

    @Autowired
    public QueryController(@Qualifier("mysqlDataSource") DataSource mysqlDataSource,
                           @Qualifier("postgresDataSource") DataSource postgresDataSource) {
        this.mysqlDataSource = mysqlDataSource;
        this.postgresDataSource = postgresDataSource;
    }

    @GetMapping("/query")
    public List> query(@RequestParam String sql) throws SQLException {
        // 创建Calcite连接
        CalciteConnection connection = DataSourceFactory.createConnection(mysqlDataSource, postgresDataSource);
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery(sql);

        // 处理查询结果
        List> result = new ArrayList<>();
        while (resultSet.next()) {
            int columnCount = resultSet.getMetaData().getColumnCount();
            List row = new ArrayList<>();
            for (int i = 1; i <= columnCount; i++) {
                row.add(resultSet.getString(i));
            }
            result.add(row);
        }

        // 关闭资源
        resultSet.close();
        statement.close();
        connection.close();

        return result;
    }
}

Application

package com.example.multids;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MultidsApplication {
    public static void main(String[] args) {
        SpringApplication.run(MultidsApplication.class, args);
    }
}

测试

MySQL orders 表

CREATE TABLE orders (
    id INT PRIMARY KEY,
    user_id INT,
    amount DECIMAL(10, 2),
    order_date DATETIME
);

PostgreSQL users 表

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100)
);

测试执行一个联合查询,从两个不同的数据源中获取数据,SQL语句是:

SELECT o.id AS order_id, u.name AS user_name, o.amount, o.order_date
FROM orders o
JOIN users u ON o.user_id = u.id;

测试结果

$ curl -X GET "http://localhost:8080/query?sql=SELECT%20o.id%20AS%20order_id,%20u.name%20AS%20user_name,%20o.amount,%20o.order_date%20FROM%20orders%20o%20JOIN%20users%20u%20ON%20o.user_id%20=%20u.id"

[
    ["1", "Alice", "199.99", "2025-04-10 21:30:00"],
    ["2", "Bob", "250.75", "2025-04-10 20:45:00"]
]

本文地址:https://www.yitenyun.com/119.html

搜索文章

Tags

数据库 API FastAPI Calcite 电商系统 MySQL Web 应用 异步数据库 数据同步 ACK 双主架构 循环复制 TIME_WAIT 运维 负载均衡 JumpServer SSL 堡垒机 跳板机 HTTPS HexHub Docker 服务器 管理口 服务器性能 JumpServer安装 堡垒机安装 Linux安装JumpServer Deepseek 宝塔面板 Linux宝塔 生命周期 esxi esxi6 root密码不对 无法登录 web无法登录 SQL 查询 序列 核心机制 Windows Windows server net3.5 .NET 安装出错 HTTPS加密 Windows宝塔 Mysql重置密码 开源 PostgreSQL 存储引擎 锁机制 宝塔面板打不开 宝塔面板无法访问 查看硬件 Linux查看硬件 Linux查看CPU Linux查看内存 行业 趋势 Oracle 处理机制 无法访问宝塔面板 Undo Log 机制 Spring Redis 异步化 监控 优化 万能公式 连接控制 InnoDB 数据库锁 机器学习 动态查询 Serverless 无服务器 语言 响应模型 ES 协同 group by 索引 技术 openHalo 分页查询 scp Linux的scp怎么用 scp上传 scp下载 scp命令 缓存方案 缓存架构 缓存穿透 Postgres OTel Iceberg 工具 存储 高可用 GreatSQL 连接数 数据 主库 SVM Embedding R edis 线程 日志文件 MIXED 3 国产数据库 Linux 安全 R2DBC Netstat Linux 服务器 端口 SQLite-Web SQLite 数据库管理工具 加密 场景 启动故障 ​Redis 推荐模型 Recursive 自定义序列化 防火墙 黑客 云原生 SQLark RocketMQ 长轮询 配置 共享锁 向量数据库 大模型 OB 单机版 AI 助手 PG DBA Hash 字段 Rsync 信息化 智能运维 Ftp 不宕机 磁盘架构 架构 电商 系统 向量库 Milvus 数据分类 Canal Python 修改DNS Centos7如何修改DNS 业务 IT运维 流量 传统数据库 向量化 • 索引 • 数据库 线上 库存 预扣 filelock 分库 分表 sftp 服务器 参数 MVCC 人工智能 推荐系统 语句 MySQL 9.3 PostGIS mini-redis INCR指令 redo log 重做日志 同城 双活 聚簇 非聚簇 频繁 Codis MongoDB MCP 开放协议 失效 Doris SeaTunnel 高效统计 今天这篇文章就跟大家 缓存 Redisson 锁芯 数据类型 虚拟服务器 虚拟机 内存 工具链 事务 Java 开发 主从复制 代理 INSERT COMPACT prometheus Alert 数据备份 千万级 大表 数据结构 ZODB 窗口 函数 网络架构 网络配置 发件箱模式 SSH 容器 QPS 高并发 EasyExcel MySQL8 引擎 性能 Web 分布式架构 分布式锁​ 聚簇索引 非聚簇索引 崖山 新版本 数据脱敏 加密算法 B+Tree ID 字段 分布式 集中式 RDB AOF 核心架构 订阅机制 Go 数据库迁移 Redis 8.0 分页 速度 服务器中毒 Web 接口 播客 OAuth2 Token 数据页 数据集成工具 读写 自动重启 网络故障 模型 StarRocks 数据仓库 容器化 排行榜 排序 池化技术 连接池 微软 SQL Server AI功能 Redka DBMS 管理系统 SpringAI JOIN MGR 分布式集群 原子性 Entity Caffeine CP 部署 业务场景 事务隔离 网络 LRU Testcloud 云端自动化 分页方案 排版 数据字典 兼容性 dbt 数据转换工具 Valkey Valkey8.0 Pottery ReadView 优化器 sqlmock 1 事务同步 悲观锁 乐观锁 关系数据库 意向锁 记录锁 日志 Weaviate 对象 单点故障 单线程 UUIDv7 主键 AIOPS 仪表盘 UUID ID InfluxDB Order 编程 Ansible Pump Crash 代码 RAG HelixDB 双引擎 分布式锁 Zookeeper 产业链 IT 恢复数据 订单 字典 LLM List 类型 线程安全 国产 用户 慢SQL优化 表空间 拦截器 动态代理 解锁 调优 Next-Key RR 互联网 GitHub Git 快照读 当前读 视图 神经系统 矢量存储 数据库类型 AI代理 查询规划 count(*) count(主键) 行数 CAS 算法 技巧 多线程 并发控制 恢复机制 闪回