用戶在創建好數據倉庫集群后使用psycopg2第三方庫連接到集群,則可以使用Python訪問DWS ,并進行數據表的各類操作。
連接集群前的準備
- DWS 集群已綁定彈性IP。
- 已獲取DWS 集群的數據庫管理員用戶名和密碼。
注意由于MD5算法已經被證實存在碰撞可能,已嚴禁將之用于密碼校驗算法。當前DWS 采用默認安全設計,默認禁止MD5算法的密碼校驗,可能導致開源客戶端無法正常連接的問題。建議先檢查一下數據庫參數password_encryption_type參數是否為1,如果取值不為1,需要修改,修改方法參見《用戶指南》的“[修改數據庫參數](//www.daliqc.cn/document/10014061/10047788)”章節;然后修改一次準備使用的數據庫用戶的密碼。
說明當前DWS出于安全考慮,已經默認不再使用MD5存儲密碼摘要了,這將導致使用開源驅動或者客戶端無法正常連接數據庫。需要您調整一下密碼策略后再創建一個新用戶或者對老用戶做一次密碼修改,方可使用開源協議中的MD5認證算法。
數據庫中是不會存儲用戶的密碼原文,而是存儲密碼的HASH摘要,在密碼校驗時與客戶端發來的密碼摘要進行比對(中間會有加鹽操作)。故當您改變了密碼算法策略時,數據庫也是無法還原您的密碼,再生成新的HASH算法的摘要值的。必須您手動修改一次密碼或者創建一個新用戶,這時新的密碼將會采用您設置的HASH算法進行摘要存儲,用于下次連接認證。
已獲取DWS 集群的公網訪問地址,含IP地址和端口。具體請參見 [獲取集群連接地址](//www.daliqc.cn/document/10014061/10047782)。
已安裝psycopg2第三方庫。下載地址:[//pypi.org/project/psycopg2/](//pypi.org/project/psycopg2/ " "),安裝部署操作請參見:[//www.psycopg.org/install/](//www.psycopg.org/install/ " ")。

說明
CentOS、Redhat等操作系統中使用yum命令安裝,命令為:yum install python-psycopg2。
psycopg2的使用依賴于PostgreSQL的libpq動態庫(32位的psycopg2需要對應32位的libpq;64位的psycopg2對應64位的libpq),Linux中可以依賴yum命令解決。在Windows系統使用psycopg2需要先安裝libpq,主要方式有兩種:
安裝PostgreSQL,并配置libpq、ssl、crypto動態庫位置到環境變量PATH中。
安裝psqlodbc,使用PostgreSQL ODBC驅動攜帶的libpq、ssl、crypto動態庫。
使用約束
由于psycopg2是基于PostgreSQL的客戶端接口,它的功能DWS并不能完全支持。具體支持情況請見下表。

說明以下接口支持情況是基于Python 3.8.5及psycopg 2.9.1版本。
DWS對psycopg2主要接口支持情況
| 類名 | 功能描述 | 函數/成員變量 | 支持 | 備注 |
|---|---|---|---|---|
| connections | basic | cursor(name=None,cursor_factory=None,scrollable=None,withhold=False) | Y | - |
| connections | basic | commit() | Y | - |
| connections | basic | rollback() | Y | - |
| connections | basic | close() | Y | - |
| connections | Two-phase commit support methods | xid(format_id,gtrid,bqual) | Y | - |
| connections | Two-phase commit support methods | tpc_begin(xid ) | Y | - |
| connections | Two-phase commit support methods | tpc_prepare() | N | 內核不支持顯式prepare transaction |
| connections | Two-phase commit support methods | tpc_commit([xid ]) | Y | - |
| connections | Two-phase commit support methods | tpc_rollback([xid ]) | Y | - |
| connections | Two-phase commit support methods | tpc_recover() | Y | - |
| connections | Two-phase commit support methods | closed | Y | - |
| connections | Two-phase commit support methods | cancel() | Y | - |
| connections | Two-phase commit support methods | reset() | N | 不支持DISCARD ALL |
| connections | Two-phase commit support methods | dsn | Y | - |
| connections | Transaction control methods and attributes. | set_session(isolation_level=None,readonly=None,deferrable=None,autocommit=None) | Y | 數據庫不支持session中設置default_transaction_read_only |
| connections | Transaction control methods and attributes. | autocommit | Y | - |
| connections | Transaction control methods and attributes. | isolation_level | Y | - |
| connections | Transaction control methods and attributes. | readonly | N | 數據庫不支持session中設置default_transaction_read_only |
| connections | Transaction control methods and attributes. | deferrable | Y | - |
| connections | Transaction control methods and attributes. | set_isolation_level(level ) | Y | - |
| connections | Transaction control methods and attributes. | encoding | Y | - |
| connections | Transaction control methods and attributes. | set_client_encoding(enc) | Y | - |
| connections | Transaction control methods and attributes. | notices | N | 數據庫不支持listen/notify |
| connections | Transaction control methods and attributes. | notifies | Y | - |
| connections | Transaction control methods and attributes. | cursor_factory | Y | - |
| connections | Transaction control methods and attributes. | info | Y | - |
| connections | Transaction control methods and attributes. | status | Y | - |
| connections | Transaction control methods and attributes. | lobject | N | 數據庫不支持大對象相關操作 |
| connections | Methods related to asynchronous support | poll() | Y | - |
| connections | fileno() | Y | - | |
| connections | isexecuting() | Y | - | |
| connections | Interoperation with other C API modules | pgconn_ptr | Y | - |
| connections | get_native_connection() | Y | - | |
| connections | informative methods of the native connection | get_transaction_status() | Y | - |
| connections | informative methods of the native connection | protocol_version | Y | - |
| connections | informative methods of the native connection | server_version | Y | - |
| connections | informative methods of the native connection | get_backend_pid() | Y | 獲取到的不是后臺的pid,是邏輯連接的id號 |
| connections | informative methods of the native connection | get_parameter_status(parameter) | Y | - |
| connections | informative methods of the native connection | get_dsn_parameters() | Y | - |
| cursor | basic | description | Y | - |
| cursor | basic | close() | Y | - |
| cursor | basic | closed | Y | - |
| cursor | basic | connection | Y | - |
| cursor | basic | name | Y | - |
| cursor | basic | scrollable | N | 數據庫不支持SCROLL CURSOR |
| cursor | basic | withhold | N | withhold cursor在commit前需要關閉 |
| cursor | Commands execution methods | execute(query,vars=None) | Y | - |
| cursor | Commands execution methods | executemany(query,vars_list) | Y | - |
| cursor | Commands execution methods | callproc(procname[,parameters]) | Y | - |
| cursor | Commands execution methods | mogrify(operation[,parameters]) | Y | - |
| cursor | Commands execution methods | setinputsizes(sizes ) | Y | - |
| cursor | Commands execution methods | fetchone() | Y | - |
| cursor | Commands execution methods | fetchmany([size=cursor.arraysize]) | Y | - |
| cursor | Commands execution methods | fetchall() | Y | - |
| cursor | Commands execution methods | scroll(value[,mode='relative']) | N | 數據庫不支持SCROLL CURSOR |
| cursor | Commands execution methods | arraysize | Y | - |
| cursor | Commands execution methods | itersize | Y | - |
| cursor | Commands execution methods | rowcount | Y | - |
| cursor | Commands execution methods | rownumber | Y | - |
| cursor | Commands execution methods | lastrowid | Y | - |
| cursor | Commands execution methods | query | Y | - |
| cursor | Commands execution methods | statusmessage | Y | - |
| cursor | Commands execution methods | cast(oid , s ) | Y | - |
| cursor | Commands execution methods | tzinfo_factory | Y | - |
| cursor | Commands execution methods | nextset() | Y | - |
| cursor | Commands execution methods | setoutputsize(size [, column ]) | Y | - |
| cursor | COPY-related methods | copy_from(file,table,sep='\t',null='\N',size=8192,columns=None) | Y | - |
| cursor | COPY-related methods | copy_to(file,table,sep='\t',null='\N',columns=None) | Y | - |
| cursor | COPY-related methods | copy_expert(sql,file,size=8192) | Y | - |
| cursor | Interoperation with other C API modules | pgresult_ptr | Y | - |
在Linux環境使用psycopg2第三方庫連接集群
1.以root用戶登錄Linux環境。
2.執行以下命令創建python_dws.py文件。
vi python_dws.py
請復制粘貼以下內容放入python_dws.py文件中:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from __future__ import print_function
import psycopg2
def create_table(connection):
print("Begin to create table")
try:
cursor = connection.cursor()
cursor.execute("drop table if exists test;"
"create table test(id int, name text);")
connection.commit()
except psycopg2.ProgrammingError as e:
print(e)
else:
print("Table created successfully")
cursor.close()
def insert_data(connection):
print("Begin to insert data")
try:
cursor = connection.cursor()
cursor.execute("insert into test values(1,'number1');")
cursor.execute("insert into test values(2,'number2');")
cursor.execute("insert into test values(3,'number3');")
connection.commit()
except psycopg2.ProgrammingError as e:
print(e)
else:
print("Insert data successfully")
cursor.close()
def update_data(connection):
print("Begin to update data")
try:
cursor = connection.cursor()
cursor.execute("update test set name = 'numberupdated' where id=1;")
connection.commit()
print("Total number of rows updated :", cursor.rowcount)
cursor.execute("select * from test order by 1;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except psycopg2.ProgrammingError as e:
print(e)
else:
print("After Update, Operation done successfully")
def delete_data(connection):
print("Begin to delete data")
try:
cursor = connection.cursor()
cursor.execute("delete from test where id=3;")
connection.commit()
print("Total number of rows deleted :", cursor.rowcount)
cursor.execute("select * from test order by 1;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except psycopg2.ProgrammingError as e:
print(e)
else:
print("After Delete,Operation done successfully")
def select_data(connection):
print("Begin to select data")
try:
cursor = connection.cursor()
cursor.execute("select * from test order by 1;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except psycopg2.ProgrammingError as e:
print(e)
print("select failed")
else:
print("Operation done successfully")
cursor.close()
if __name__ == '__main__':
try:
conn = psycopg2.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要連接的database
user='dbadmin',
password='password') # 數據庫用戶密碼
except psycopg2.DatabaseError as ex:
print(ex)
print("Connect database failed")
else:
print("Opened database successfully")
create_table(conn)
insert_data(conn)
select_data(conn)
update_data(conn)
delete_data(conn)
conn.close()
3.按照實際集群信息,修改python_dws.py文件中的集群公網訪問地址、集群端口號、數據庫名稱、數據庫用戶名、數據庫密碼。
psycopg2接口不提供重試連接的能力,您需要在業務代碼中實現重試處理。
conn = psycopg2.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要連接的database
user='dbadmin',
password='password') # 數據庫用戶密碼
4.執行以下命令,使用psycopg第三方庫連接集群。
python python_dws.py
在Windows環境使用psycopg2第三方庫連接集群
1.在Windows系統中,單擊“開始”按鈕 ,在搜索框中,鍵入 cmd ,然后在結果列表中單擊“cmd.exe”打開命令提示符窗口。
2.在命令提示符窗口中,執行以下命令創建python_dws.py文件。
type nul> python_dws.py
請復制粘貼以下內容放入python_dws.py文件中:
請復制粘貼以下內容放入python_dws.py文件中:
#!/usr/bin/python
# -*- coding:UTF-8 -*-
from __future__ import print_function
import psycopg2
def create_table(connection):
print("Begin to create table")
try:
cursor = connection.cursor()
cursor.execute("drop table if exists test;"
"create table test(id int, name text);")
connection.commit()
except psycopg2.ProgrammingError as e:
print(e)
else:
print("Table created successfully")
cursor.close()
def insert_data(connection):
print("Begin to insert data")
try:
cursor = connection.cursor()
cursor.execute("insert into test values(1,'number1');")
cursor.execute("insert into test values(2,'number2');")
cursor.execute("insert into test values(3,'number3');")
connection.commit()
except psycopg2.ProgrammingError as e:
print(e)
else:
print("Insert data successfully")
cursor.close()
def update_data(connection):
print("Begin to update data")
try:
cursor = connection.cursor()
cursor.execute("update test set name = 'numberupdated' where id=1;")
connection.commit()
print("Total number of rows updated :", cursor.rowcount)
cursor.execute("select * from test order by 1;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except psycopg2.ProgrammingError as e:
print(e)
else:
print("After Update, Operation done successfully")
def delete_data(connection):
print("Begin to delete data")
try:
cursor = connection.cursor()
cursor.execute("delete from test where id=3;")
connection.commit()
print("Total number of rows deleted :", cursor.rowcount)
cursor.execute("select * from test order by 1;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except psycopg2.ProgrammingError as e:
print(e)
else:
print("After Delete,Operation done successfully")
def select_data(connection):
print("Begin to select data")
try:
cursor = connection.cursor()
cursor.execute("select * from test order by 1;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except psycopg2.ProgrammingError as e:
print(e)
print("select failed")
else:
print("Operation done successfully")
cursor.close()
if __name__ == '__main__':
try:
conn = psycopg2.connect(host='10.154.70.231',
port='8000',
database='postgresgaussdb', # 需要連接的database
user='dbadmin',
password='password') # 數據庫用戶密碼
except psycopg2.DatabaseError as ex:
print(ex)
print("Connect database failed")
else:
print("Opened database successfully")
create_table(conn)
insert_data(conn)
select_data(conn)
update_data(conn)
delete_data(conn)
conn.close()
3.按照實際集群信息,修改python_dws.py文件中的集群公網訪問地址、集群端口號、數據庫名稱、數據庫用戶名、數據庫密碼。
conn = psycopg2.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要連接的database
user='dbadmin',
password='password') # 數據庫用戶密碼
4.在命令提示符窗口中,執行以下命令,使用psycopg第三方庫連接集群。
python python_dws.py
psycopg2連接集群不支持CN Retry特性的問題說明
DWS支持在SQL語句執行出錯時的自動重試功能(簡稱CN Retry)。CN Retry對于客戶端和驅動發送的SQL語句在執行失敗時可以自動識別錯誤類型,并進行重試。但使用psycopg2默認連接方式創建的連接在語句執行失敗時沒有自動重試,會直接報錯退出。如常見的主備切換場景下,未自動重試會報如下錯誤,但在自動重試期間完成主備切換,則會返回正確結果。
psycopg2.errors.ConnectionFailure: pooler: failed to create 1 connections, Error Message: remote node dn_6003_6004, detail: could not connect to server: Operation now in progress
報錯原因:
1.psycopg2在發送SQL語句前先發送了BEGIN語句開啟事務。
2.CN Retry不支持事務塊中的語句是特性約束。
解決方案:
- 在同步方式連接時,可以通過主動結束驅動開啟的事務。
cursor = conn.cursor()
# 增加end語句主動結束驅動開啟的事務
cursor.execute("end; select * from test order by 1;")
rows = cursor.fetchall()
- 使用異步連接方式主動開啟事務,異步連接介紹具體請參見pyscopg官網。
#!/usr/bin/env python3
# _*_ encoding=utf-8 _*_
import psycopg2
import select
# psycopg2官方提供的異步連接方式時的wait函數
def wait(conn):
while True:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
select.select([conn.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s" % state)
def psycopg2_cnretry_sync():
# 創建連接
conn = psycopg2.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要連接的database
user='dbadmin',
password='password', # 數據庫用戶密碼
async=1) # 使用異步方式連接
wait(conn)
# 執行查詢
cursor = conn.cursor()
cursor.execute("select * from test order by 1;")
wait(conn)
rows = cursor.fetchall()
for row in rows:
print(row[0], row[1])
# 關閉連接
conn.close()
if __name__ == '__main__':
psycopg2_cnretry_async()