通過創建觸發器和函數實現PostgreSQL增量DDL同步
更新時間 2024-02-22 11:03:21
最近更新時間: 2024-02-22 11:03:21
分享文章
本小結介紹PostgreSQL->RDS for PostgreSQL實時同步,通過在源庫創建觸發器和函數獲取源庫的DDL信息,然后在DRS增量實時同步階段實現DDL操作的同步。
前提條件
- 當前支持的DDL操作包含如下:
- 表級同步支持:TRUNCATE(僅PostgreSQL 11及以上版本支持)、DROP TABLE 、ALTER TABLE(包含ADD COLUMN、DROP COLUMN、ALTER COLUMN、RENAME COLUMN、ADD CONSTRAINT、DROP CONSTRAINT、RENAME)。
- 庫級同步支持:TRUNCATE(僅PostgreSQL 11及以上版本支持)、CREATE SCHEMA/TABLE、DROP TABLE 、ALTER TABLE(包含ADD COLUMN、DROP COLUMN、ALTER COLUMN、RENAME COLUMN、ADD CONSTRAINT、DROP CONSTRAINT、RENAME)、CREATE SEQUENCE、DROP SEQUENCE、ALTER SEQUENCE、CREATE INDEX、ALTER INDEX、DROP INDEX、CREATE VIEW、ALTER VIEW。
注意
- 表級同步:RENAME表名之后,向更改名稱后的表插入新的數據時,DRS不會同步新的數據到目標庫。
- 庫級同步:源庫使用非CREATE TABLE方式創建的表不會同步到目標庫。常見地如:使用CREATE TABLE AS創建表、調用函數創建表。
- 暫不支持以注釋開頭的DDL語句的同步,以注釋開頭的DDL語句將被忽略。
- 不支持函數和存儲過程中DDL語句的同步,函數和存儲過程中執行的DDL語句將被忽略。
- 源庫和目標庫版本不同時,請使用源庫和目標庫都兼容的SQL語句執行DDL操作。例如:源庫為pg11,目標庫為pg12,要將源庫表的列類型從char修改為int時,請使用如下語句:
alter table tablename alter column columnname type int USING columnname::int;
- 執行如下操作步驟前,請檢查待同步的源數據庫public模式下,是否存在名為hwdrs_ddl_info的表、名為hwdrs_ddl_function()的函數、名為hwdrs_ddl_event的觸發器。如存在,請將其刪除。
- 庫級同步時,如創建無主鍵表,請執行如下命令,將無主鍵表復制屬性設置為full。
alter table tablename replica identity full;
操作步驟

說明若源庫為本云RDS for PostgreSQL,可以使用root用戶創建相關對象,如果執行時報“Must be superuser to create an event trigger”錯誤,可以通過工單申請處理。本云RDS for PostgreSQL的root用戶權限請參見RDS用戶指南。
步驟 1 使用擁有創建事件觸發器權限的用戶連接要同步的數據庫。
步驟 2 執行如下語句,創建存儲DDL信息的表。
DROP TABLE IF EXISTS public.hwdrs_ddl_info;
DROP SEQUENCE IF EXISTS public.hwdrs_ddl_info_id_seq;
CREATE TABLE public.hwdrs_ddl_info(
id ????????????????????????????bigserial primary key,
ddl ???????????????????????????text,
username ??????????????????????varchar(64) default current_user, ?
txid ??????????????????????????varchar(16) default txid_current()::varchar(16),
tag ???????????????????????????varchar(64), ?
database ??????????????????????varchar(64) default current_database(), ?
schema ????????????????????????varchar(64) default current_schema,
client_address ????????????????varchar(64) default inet_client_addr(),
client_port ???????????????????integer default inet_client_port(),
event_time ????????????????????timestamp default current_timestamp
);
步驟 3 執行如下語句,創建函數。
CREATE OR REPLACE FUNCTION public.hwdrs_ddl_function()
RETURNS event_trigger
LANGUAGE plpgsql
SECURITY INVOKER
AS ?BODYBODY?
declare ddl text;
declare real_num int;
declare max_num int := 50000;
begin
if (tg_tag in ('CREATE TABLE','ALTER TABLE','DROP TABLE','CREATE SCHEMA','CREATE SEQUENCE','ALTER SEQUENCE','DROP SEQUENCE','CREATE VIEW','ALTER VIEW','DROP VIEW','CREATE INDEX','ALTER INDEX','DROP INDEX')) then
select current_query() into ddl; ?
insert into public.hwdrs_ddl_info(ddl, username, txid, tag, database, schema, client_address, client_port, event_time)
values (ddl, current_user, cast(txid_current() as varchar(16)), tg_tag, current_database(), current_schema, ?inet_client_addr(), inet_client_port(), current_timestamp);
select count(id) into real_num from public.hwdrs_ddl_info;
if real_num > max_num then
if current_setting('server_version_num')::int<100000 then
delete from public.hwdrs_ddl_info where id<(select min(id)+1000 from public.hwdrs_ddl_info) and not exists (select 0 from pg_locks l join pg_database d on l.database=d.oid where d.datname=current_catalog and pid<>pg_backend_pid() and locktype='relation' and relation=to_regclass('public.hwdrs_ddl_info_pkey')::oid and mode='RowExclusiveLock');
else ?
delete from public.hwdrs_ddl_info where id<(select min(id)+1000 from public.hwdrs_ddl_info) and (xmax=0 or coalesce(txid_status(xmax::text::bigint), '')<>'in progress');
end if;
end if;
end if;
end;
??BODYBODY?;
步驟 4 執行以下語句,為步驟2和步驟3中創建的對象賦予必要權限。
GRANT USAGE ON SCHEMA public TO public;
GRANT SELECT,INSERT,DELETE ON TABLE public.hwdrs_ddl_info TO public;
GRANT SELECT,USAGE ON SEQUENCE public.hwdrs_ddl_info_id_seq TO public;
GRANT EXECUTE ON FUNCTION public.hwdrs_ddl_function() TO public;
步驟 5 執行以下語句,創建DDL事件觸發器。
CREATE EVENT TRIGGER hwdrs_ddl_event ON ddl_command_end EXECUTE PROCEDURE public.hwdrs_ddl_function();
步驟 6 執行以下語句,將創建的事件觸發器設置為enable。
ALTER EVENT TRIGGER hwdrs_ddl_event ENABLE ALWAYS;
步驟 7 返回數據復制服務控制臺,創建PostgreSQL->RDS for PostgreSQL的同步任務。
步驟 8 待同步任務結束后,請執行下語句刪除創建的表、函數、觸發器。
DROP EVENT trigger hwdrs_ddl_event;
DROP FUNCTION public.hwdrs_ddl_function();
DROP TABLE public.hwdrs_ddl_info;