从 Kafka 加载数据
Kafka Foreign Data Wrapper (FDW) 提供了 Apache Cloudberry 与 Apache Kafka 连接的能力,使得数据库能够直接从 Kafka 中读取数据,并将其作为外部表来处理。Apache Cloudberry 用户可以更高效、灵活、可靠地处理 Kafka 中的实时数据,从而提高数据处理能力和业务效率。
Apache Cloudberry 支持使用 Kafka FDW 来创建外部表以及导入数据。
更多信息,参考 Kafka FDW 仓库:https://github.com/cloudberry-contrib/kafka_fdw。注意,该仓库由社区成员贡献,但不是由 PPMC 维护的官方社区项目。
基本使用
-
创建
kafka_fdw
扩展。CREATE EXTENSION kafka_fdw;
-
创建外部服务器,指定 Kafka 的集群地址。你需要将
localhost:9092
替换为你的 Kafka 集群地址。CREATE SERVER kafka_server
FOREIGN DATA WRAPPER kafka_fdw
OPTIONS (mpp_execute 'all segments', brokers 'localhost:9092'); -
创建 user mapping。
CREATE USER MAPPING FOR PUBLIC SERVER kafka_server;
-
创建外部表
创建外部表时,必须指定两个元数据信息列
partition
和offset
,用于标识 Kafka 中的一个 Topic 的消息所属的分区和偏移。下面是一个示例:CREATE FOREIGN TABLE kafka_test (
part int OPTIONS (partition 'true'),
offs bigint OPTIONS (offset 'true'),
some_int int,
some_text text,
some_date date,
some_time timestamp
)
SERVER kafka_server OPTIONS
(format 'csv', topic 'contrib_regress_csv', batch_size '1000', buffer_delay '1000');参数说明:
batch_size
:从 Kafka 读取一次数据的量。buffer_delay
:从 Kafka 获取数据的超时时间。
支持的数据格式
目前支持 CSV
和 JSON
两种数据格式。
查询
可以在查询的时候指定消息的分区和偏移,指定 partition
或 offset
:
SELECT * FROM kafka_test WHERE part = 0 AND offs > 1000 LIMIT 60;
也可以指定多个条件:
SELECT * FROM kafka_test WHERE (part = 0 AND offs > 100) OR (part = 1 AND offs > 300) OR (part = 3 AND offs > 700);