关于 pg_replicate
pg_replicate
是一个用于快速构建 Postgres 复制解决方案的 Rust crate。它提供了构建数据管道的基础模块,可以持续地将数据从 Postgres 复制到其他系统。它在 Postgres 的逻辑流复制协议之上构建抽象,引导用户走向成功之路,而无需担心协议的底层细节。
快速入门
要快速尝试 pg_replicate
,您可以运行 stdout
示例,它会将数据复制到标准输出。首先,在 Postgres 中创建一个包含您想要复制的表的发布:
create publication my_publication
for table table1, table2;
然后运行 stdout
示例:
cargo run --example stdout -- --db-host localhost --db-port 5432 --db-name postgres --db-username postgres --db-password password cdc my_publication stdout_slot
在上面的示例中,pg_replicate
连接到运行在 localhost:5432
上的名为 postgres
的 Postgres 数据库,使用用户名 postgres
和密码 password
。槽名 stdout_slot
将由 pg_replicate
自动创建。
参考示例文件夹以运行除 stdout
之外的其他接收器的示例(目前仅支持 bigquery
和 duckdb
)。小提示:要查看所有命令行选项,可以不指定任何选项运行示例,例如 cargo run --example bigquery
将打印 bigquery
接收器的详细使用说明。
开始使用
要在您的 Rust 项目中使用 pg_replicate
,请通过 Cargo.toml
中的 git 依赖添加它:
[dependencies]
pg_replicate = { git = "https://github.com/supabase/pg_replicate" }
目前需要 git 依赖,因为 pg_replicate
尚未发布在 crates.io 上。您还需要添加 tokio 的依赖:
[dependencies]
...
tokio = { version = "1.38" }
现在您的 main.rs
可以包含如下代码:
use std::error::Error;
use pg_replicate::pipeline::{
data_pipeline::DataPipeline,
sinks::stdout::StdoutSink,
sources::postgres::{PostgresSource, TableNamesFrom},
PipelineAction,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let host = "localhost";
let port = 5432;
let database = "postgres";
let username = "postgres";
let password = Some("password".to_string());
let slot_name = Some("my_slot".to_string());
let table_names = TableNamesFrom::Publication("my_publication".to_string());
// 创建 PostgresSource
let postgres_source = PostgresSource::new(
host,
port,
database,
username,
password,
slot_name,
table_names,
)
.await?;
// 创建 StdoutSink。这个接收器只是将接收到的事件打印到标准输出
let stdout_sink = StdoutSink;
// 创建 `DataPipeline` 以连接源和接收器
let mut pipeline = DataPipeline::new(postgres_source, stdout_sink, PipelineAction::Both);
// 启动 `DataPipeline` 开始从 Postgres 复制数据到标准输出
pipeline.start().await?;
Ok(())
}
更多示例请参考源代码中的示例文件夹。
仓库结构
该仓库是一个 cargo 工作空间。每个子文件夹都是工作空间中的一个 crate。以下是每个 crate 的简要说明:
api
- 用于在云环境中托管pg_replicate
的 REST API。pg_replicate
- 包含核心逻辑的主要库 crate。replicator
- 使用pg_replicate
的二进制 crate。打包为 Docker 容器以用于云托管。
路线图
pg_replicate
仍在积极开发中,所以可能会有 bug 和小问题,但随着时间的推移,我们计划添加以下接收器:
- 添加 BigQuery 接收器
- 添加 DuckDb 接收器
- 添加 MotherDuck 接收器
- 添加 Snowflake 接收器
- 添加 ClickHouse 接收器
- 更多即将推出...
注意:DuckDb 和 MotherDuck 接收器不使用批处理管道,因此目前性能较差。计划开发这些接收器的批处理管道版本。
查看未解决的问题以获取提议功能的完整列表(以及已知问题)。
许可证
根据 Apache-2.0 许可证分发。有关更多信息,请参阅 LICENSE
。
Docker
要创建 replicator
的 Docker 镜像,请在仓库根目录运行 docker build -f ./replicator/Dockerfile .
。同样,要创建 api
的 Docker 镜像,请运行 docker build -f ./api/Dockerfile .
。
设计
应用程序可以使用 pg_replicate
中的数据源和接收器来构建数据管道,以持续从源复制数据到接收器。例如,一个从 Postgres 复制数据到 DuckDB 的数据管道只需要约 100 行 Rust 代码。
数据管道中有三个组件:
- 数据源
- 数据接收器
- 管道
数据源是数据将被复制的对象。数据接收器是数据将被复制到的对象。管道是驱动从源到接收器的数据复制操作的对象。
+----------+ +----------+
| | | |
| 数据源 |---- 数据管道 ----> | 接收器 |
| | | |
+----------+ +----------+
所以大致上你写的代码是这样的:
let postgres_source = PostgresSource::new(...);
let duckdb_sink = DuckDbSink::new(..);
let pipeline = DataPipeline(postgres_source, duckdb_sink);
pipeline.start();
当然,实际代码不止这四行,但这是基本思路。完整示例请查看 duckdb 示例。
数据源
数据源是管道将复制到数据接收器的数据来源。目前,仓库只有一个数据源:PostgresSource
。PostgresSource
是主要数据源;任何其他源或接收器中的数据都源自它。
数据接收器
数据接收器是数据源的数据被复制到的地方。有两种类型的数据接收器。一种保留从 PostgresSource
输出的数据本质特性,另一种则不保留。前者可以在未来充当数据源,后者不能充当数据源,是数据的最终存放地。
例如,DuckDbSink
确保从源进来的变更数据捕获(CDC)流被物化为 DuckDB 数据库中的表。一旦完成这种有损数据转换,它就不能再用作 CDC 流。
相比之下,潜在的未来接收器 S3Sink
或 KafkaSink
只是按原样复制 CDC 流。存放在接收器中的数据稍后可以像直接从 Postgres 来的一样使用。
数据管道
数据管道封装了从源到接收器复制数据的业务逻辑。它还协调从上次停止的确切位置恢复 CDC 流。数据接收器通过持久化恢复状态并在重启时将其返回给管道来参与这个过程。
如果数据接收器不是事务性的(例如 S3Sink
),就不总是可能保持 CDC 流和恢复状态之间的一致性。这可能导致这些非事务性接收器有重复的 CDC 流部分。当数据被复制到像 DuckDB 这样的事务性存储时,数据管道有助于对这些重复的 CDC 事件进行去重。
最后,数据管道将 CDC 流已复制到接收器的日志序列号(LSN)报告回 PostgresSource
。这允许 Postgres 数据库通过移除数据接收器不再需要的 WAL 段文件来回收磁盘空间。
+----------+ +----------+
| | | |
| 数据源 |<---- LSN 编号 -----| 接收器 |
| | | |
+----------+ +----------+
数据复制类型
CDC 流不是数据管道执行的唯一数据类型。还有全表复制,也称为回填。这两种类型可以一起执行,也可以分开执行。例如,一次性数据复制可以使用回填。但如果你想定期从 Postgres 复制数据到 OLAP 数据库,就应该同时使用回填和 CDC 流。回填用于获取数据的初始副本,CDC 流用于保持这些副本随着 Postgres 中复制表的变化而更新。
性能
目前,数据源和接收器一次复制一个表行和 CDC 事件。这预计会很慢。批处理和其他策略可能会大大提高性能。但在这个早期阶段,重点是正确性而非性能。目前还没有任何基准测试,所以关于性能的评论更接近推测而非现实。