Postgres 消息队列 (PGMQ)
一个轻量级消息队列。类似于 AWS SQS 和 RSMQ,但基于 Postgres。
源码:https://github.com/tembo-io/pgmq
特性
- 轻量级 - 无需后台工作进程或外部依赖,仅使用打包在扩展中的 Postgres 函数
- 在可见性超时内保证消息对消费者的"恰好一次"传递
- 与 AWS SQS 和 RSMQ 的 API 兼容
- 消息在队列中保留,直到被明确删除
- 消息可以被归档而不是删除,以实现长期保留和重放
支持
Postgres 12-16。
目录
安装
最快的入门方式是运行 Tembo Docker 镜像,其中 PGMQ 已预装在 Postgres 中。
docker run -d --name pgmq-postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest
如果您想从源码构建,可以按照 CONTRIBUTING.md 中的说明进行操作。
更新
要更新 PGMQ 版本,请按照 UPDATING.md 中的说明进行操作。
客户端库
社区
- Dart
- Go
- Elixir
- Elixir + Broadway
- Java (Spring Boot)
- Kotlin JVM (JDBC)
- Javascript (NodeJs)
- .NET
- Python(使用 SQLAlchemy)
SQL 示例
# 连接到 Postgres
psql postgres://postgres:postgres@0.0.0.0:5432/postgres
-- 在 "pgmq" 模式中创建扩展
CREATE EXTENSION pgmq;
创建队列
每个队列都是 pgmq
模式中的一个表。表名是队列名称前缀为 q_
。
例如,pgmq.q_my_queue
是队列 my_queue
的表。
-- 创建队列
SELECT pgmq.create('my_queue');
create
-------------
(1 row)
发送两条消息
-- 消息以 JSON 格式发送
SELECT * from pgmq.send(
queue_name => 'my_queue',
msg => '{"foo": "bar1"}'
);
发送函数返回消息 ID。
send
-----------
1
(1 row)
-- 可选择提供延迟
-- 这条消息将在队列中,但在 5 秒内无法被消费
SELECT * from pgmq.send(
queue_name => 'my_queue',
msg => '{"foo": "bar2"}',
delay => 5
);
send
-----------
2
(1 row)
读取消息
从队列中读取 2
条消息。使它们在 30
秒内不可见。
如果这些消息在 30 秒内未被删除或归档,它们将再次变为可见,
并可以被另一个消费者读取。
SELECT * FROM pgmq.read(
queue_name => 'my_queue',
vt => 30,
qty => 2
);
msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+-----------------
1 | 1 | 2023-08-16 08:37:54.567283-05 | 2023-08-16 08:38:29.989841-05 | {"foo": "bar1"}
2 | 1 | 2023-08-16 08:37:54.572933-05 | 2023-08-16 08:38:29.989841-05 | {"foo": "bar2"}
如果队列为空,或者所有消息当前不可见,则不会返回任何行。
SELECT * FROM pgmq.read(
queue_name => 'my_queue',
vt => 30,
qty => 1
);
msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------+----+---------
弹出消息
-- 读取一条消息并立即从队列中删除。如果队列为空或所有消息不可见,则返回空记录。
SELECT * FROM pgmq.pop('my_queue');
msg_id | read_ct | enqueued_at | vt | message
--------+---------+-------------------------------+-------------------------------+-----------------
1 | 1 | 2023-08-16 08:37:54.567283-05 | 2023-08-16 08:38:29.989841-05 | {"foo": "bar1"}
归档消息
归档消息会将其从队列中移除并插入到归档表中。
-- 归档 msg_id=2 的消息。
SELECT pgmq.archive(
queue_name => 'my_queue',
msg_id => 2
);
archive
--------------
t
(1 row)
或者使用 msg_ids
(复数)参数在一个操作中归档多条消息:
首先,发送一批消息
SELECT pgmq.send_batch(
queue_name => 'my_queue',
msgs => ARRAY['{"foo": "bar3"}','{"foo": "bar4"}','{"foo": "bar5"}']::jsonb[]
);
send_batch
------------
3
4
5
(3 rows)
然后使用 msg_ids(复数)参数归档它们。
SELECT pgmq.archive(
queue_name => 'my_queue',
msg_ids => ARRAY[3, 4, 5]
);
archive
---------
3
4
5
(3 rows)
可以直接使用 SQL 检查归档表。
归档表在 pgmq
模式中有 a_
前缀。
SELECT * FROM pgmq.a_my_queue;
msg_id | read_ct | enqueued_at | archived_at | vt | message
--------+---------+-------------------------------+-------------------------------+-------------------------------+-----------------
2 | 0 | 2024-08-06 16:03:41.531556+00 | 2024-08-06 16:03:52.811063+00 | 2024-08-06 16:03:46.532246+00 | {"foo": "bar2"}
3 | 0 | 2024-08-06 16:03:58.586444+00 | 2024-08-06 16:04:02.85799+00 | 2024-08-06 16:03:58.587272+00 | {"foo": "bar3"}
4 | 0 | 2024-08-06 16:03:58.586444+00 | 2024-08-06 16:04:02.85799+00 | 2024-08-06 16:03:58.587508+00 | {"foo": "bar4"}
5 | 0 | 2024-08-06 16:03:58.586444+00 | 2024-08-06 16:04:02.85799+00 | 2024-08-06 16:03:58.587543+00 | {"foo": "bar5"}
删除消息
再发送一条消息,以便我们可以删除它。
SELECT pgmq.send('my_queue', '{"foo": "bar6"}');
send
-----------
6
(1 row)
从名为 my_queue
的队列中删除 ID 为 6
的消息。
SELECT pgmq.delete('my_queue', 6);
delete
-------------
t
(1 row)
删除队列
删除队列 my_queue
。
SELECT pgmq.drop_queue('my_queue');
drop_queue
-----------------
t
(1 row)
配置
分区队列
如果你想使用 pgmq
分区队列,你需要安装 pg_partman。
pgmq
队列表可以通过使用 pgmq.create_partitioned()
创建为分区表。pg_partman 处理所有队列表的维护工作。这包括创建新分区和删除旧分区。
分区行为在创建队列时通过 pgmq.create_partitioned()
进行配置。该函数有三个参数:
queue_name: text
:队列的名称。队列是以 q_
为前缀的 Postgres 表。例如,q_my_queue
。归档表则以 a_
为前缀,例如 a_my_queue
。
partition_interval: text
- 创建分区的间隔。可以是 pg_partman 支持的任何有效 Postgres Duration
,或整数值。当它是一个持续时间时,队列按消息发送到表的时间(enqueued_at
)进行分区。值为 'daily'
将每天创建一个新分区。当它是整数值时,队列按 msg_id
分区。值为 '100'
将每 100 条消息创建一个新分区。该值必须与 retention_interval
(基于时间或数值)一致。默认值为 daily
。对于归档表,当间隔为整数值时,将按 msg_id
分区。在持续时间的情况下,它将在 archived_at
上分区,而不像队列表。
retention_interval: text
- 保留分区的间隔。可以是 pg_partman 支持的任何有效 Postgres Duration
,或整数值。当它是一个持续时间时,将删除包含超过该持续时间数据的分区。当它是整数值时,将删除任何 msg_id
小于 max(msg_id) - retention_interval
的消息。例如,如果最大 msg_id
是 100,retention_interval
是 60,则任何 msg_id
值小于 40 的分区都将被删除。该值必须与 partition_interval
(基于时间或数值)一致。默认值为 '5 days'
。注意:retention_interval
不适用于通过 pgmq.delete()
删除或通过 pgmq.archive()
归档的消息。pgmq.delete()
永久删除消息,pgmq.archive()
将消息永久移至相应的归档表(例如,a_my_queue
)。
为了进行自动分区维护,必须在 postgresql.conf
文件中添加几个设置,该文件通常位于 postgres 的 DATADIR
中。
pg_partman_bgw.interval
在 postgresql.conf
中。以下是在 Tembo docker 镜像中设置的默认配置值。
将以下内容添加到 postgresql.conf
。注意,更改 shared_preload_libraries
需要重启 Postgres。
pg_partman_bgw.interval
设置 pg_partman
进行维护的间隔。这会创建新分区并删除超出 retention_interval
的分区。默认情况下,pg_partman
会保持当前活动分区"ahead"4个分区。
shared_preload_libraries = 'pg_partman_bgw' # 需要重启 Postgres
pg_partman_bgw.interval = 60
pg_partman_bgw.role = 'postgres'
pg_partman_bgw.dbname = 'postgres'
可见性超时(vt)
pgmq 在可见性超时内保证消息的精确一次传递。可见性超时是消息被消费者读取后对其他消费者不可见的时间。如果消息在可见性超时内未被删除或归档,它将再次变为可见并可被另一个消费者读取。可见性超时在通过 pgmq.read()
从队列读取消息时设置。建议将 vt
值设置为大于处理消息的预期时间。应用程序成功处理消息后,应调用 pgmq.delete()
从队列中完全删除消息,或调用 pgmq.archive()
将其移至队列的归档表。
谁在使用 pgmq?
随着 pgmq 社区的成长,我们很想了解谁在使用它。请发送 PR,包含您的公司名称和 @github 用户名。
目前,官方使用 pgmq 的有:
✨ 贡献者
感谢这些了不起的人: