本小節介紹PostgreSQL為源的同步鏈路在強制結束任務后,如何清理源庫邏輯復制槽、如何同步序列值以及如何在源數據庫已經無法連接的情況下,重置目標庫中自增或自減列關聯序列的序列值。
清理源庫邏輯復制槽
步驟 1 使用對應DRS任務的源數據庫連接用戶,登錄該同步任務的源數據庫。
步驟 2 查詢同步任務選擇的database對象所對應的流復制槽名稱。
select slot_name from pg_replication_slots where database = 'database';
注意其中database為DRS同步任務中選擇同步的database。
步驟 3 執行如下語句,刪除對應的流復制槽。
select * from pg_drop_replication_slot('slot_name');
注意其中slot_name為步驟2中查詢的流復制槽名稱。
步驟 4 執行如下語句,查詢流復制槽是否成功刪除。
select slot_name from pg_replication_slots where slot_name = 'slot_name';
查詢結果為空表示DRS同步任務對應的流復制槽已成功刪除。
同步序列值
如果未同步序列對象或者目標庫為GaussDB(for openGauss),可忽略此節。
步驟 1 使用高權限賬號(需要具有所有序列的USAGE權限)連接對應DRS任務同步的源數據庫,執行如下語句。
select 'SELECT pg_catalog.setval('||quote_literal(quote_ident(n.nspname)||'.'||quote_ident(c.relname))||', '||nextval(c.oid)||');' as sqls from pg_class c join pg_namespace n on c.relnamespace=n.oid where c.relkind = 'S' and n.nspname !~'^pg_' and n.nspname<>'information_schema' and not (c.relname='hwdrs_ddl_info_id_seq' and n.nspname='public') order by n.nspname, c.relname;
查詢結果為需要在目標數據庫中執行的sql語句。
步驟 2 使用對應DRS任務的目標數據庫鏈接用戶連接該任務同步的目標數據庫,在目標庫中執行步驟[步驟1]( " ")中查詢出的sql語句。
步驟 3 在目標庫中執行如下語句,檢查序列值同步結果。
SELECT n.nspname, c.relname, nextval(c.oid) from pg_class c join pg_namespace n on c.relnamespace=n.oid where c.relkind = 'S' and n.nspname !~'^pg_' and n.nspname<>'information_schema' order by 1,2;
源庫無法連接時,重置目標庫中的序列值
在某些極端場景下,源數據庫可能已經被損壞而無法連接,此時仍需將目標庫中與自增或自減列相關聯的序列值進行重置。如果源數據可以連接,請忽略此節,參考上節操作即可。
步驟 1 使用對應DRS任務的目標數據庫測試鏈接用戶,登錄該同步任務的目標數據庫。
步驟 2 使用如下語句,查詢出將nextval作為表列默認值的序列對應的序列值重置sql語句。
set search_path to ''; select 'SELECT pg_catalog.setval('||quote_literal(quote_ident(s.sequence_schema)||'.'||quote_ident(s.sequence_name))||', (SELECT '||case when s.increment::int<0 then 'min(' else 'max(' end|| quote_ident(c.column_name)||')'||case when s.increment::int<0 then '-1' else '+1' end||' FROM '||quote_ident(c.table_schema)||'.'||quote_ident(c.table_name)||'));' as sqls from information_schema.columns c join information_schema.sequences s on (position(quote_literal (quote_ident(s.sequence_schema)||'.'||quote_ident(s.sequence_name))||'::regclass' in c.column_default) > 0) where c.data_type in ('bigint', 'int', 'integer', 'smallint', 'numeric', 'real', 'double precision', 'double') and c.column_default like 'nextval(%%' order by s.sequence_schema, s.sequence_name;
查詢結果為需要在目標數據庫中執行的sql語句。
步驟 3 如果源庫的版本小于10.0,請忽略此步。如果源庫的版本不小于10.0,請在目標庫中執行如下語句查詢出重置表標識列附加序列的對應序列值的sql語句。
set search_path to ''; select 'SELECT pg_catalog.setval('||quote_literal(seqname)||', (SELECT '||case when increment::int<0 then 'min(' else 'max(' end||colname||')'||case when increment::int<0 then '-1' else '+1' end||' FROM '||tablename||'));' as sqls from (select objid::regclass::text, refobjid::regclass::text, (pg_identify_object(refclassid,refobjid,refobjsubid)).identity, (pg_sequence_parameters(objid)).increment from pg_depend where deptype='i' and refobjsubid>0 and objid in (select c.oid from pg_class c join pg_namespace n on c.relnamespace=n.oid where c.relkind='S' and n.nspname !~ '^pg_' and n.nspname<>'information_schema')) p(seqname,tablename,colname,increment);
查詢結果為需要在目標數據庫中執行的sql語句。
步驟 4 在目標庫中執行步驟[步驟2]( " ")和步驟[步驟3]( " ")中查詢出的sql語句。
步驟 5 在目標庫中執行如下語句,檢查序列值同步結果。
SELECT n.nspname, c.relname, nextval(c.oid) from pg_class c join pg_namespace n on c.relnamespace=n.oid where c.relkind = 'S' and n.nspname !~'^pg_' and n.nspname<>'information_schema' order by 1,2;