用戶在創建好數據倉庫集群后使用PyGreSQL第三方庫連接到集群,則可以使用Python訪問DWS ,并進行數據表的各類操作。
連接集群前的準備
- DWS 集群已綁定彈性IP。
- 已獲取DWS 集群的數據庫管理員用戶名和密碼。
注意由于MD5算法已經被證實存在碰撞可能,已嚴禁將之用于密碼校驗算法。當前DWS 采用默認安全設計,默認禁止MD5算法的密碼校驗,可能導致開源客戶端無法正常連接的問題。建議先檢查一下數據庫參數password_encryption_type參數是否為1,如果取值不為1,需要修改,修改方法參見《用戶指南》的“[修改數據庫參數](//www.daliqc.cn/document/10014061/10047788)”章節;然后修改一次準備使用的數據庫用戶的密碼。
說明當前DWS出于安全考慮,已經默認不再使用MD5存儲密碼摘要了,這將導致使用開源驅動或者客戶端無法正常連接數據庫。需要您調整一下密碼策略后再創建一個新用戶或者對老用戶做一次密碼修改,方可使用開源協議中使用的MD5認證算法。
數據庫中是不會存儲您的密碼原文的,而是存儲的密碼的HASH摘要,在密碼校驗時與客戶端發來的密碼摘要進行比對(中間會有加鹽操作)。故當您改變了密碼算法策略時,數據庫也是無法還原您的密碼,再生成新的HASH算法的摘要值的。必須您手動修改一次密碼或者創建一個新用戶,這時新的密碼將會采用您設置的HASH算法進行摘要存儲,用于下次連接認證。
- 已獲取DWS 集群的公網訪問地址,含IP地址和端口。具體請參見 獲取集群連接地址。
- 已安裝PyGreSQL第三方庫。
說明CentOS、Redhat等操作系統中使用yum命令安裝,命令為:yum install PyGreSQL。
PyGreSQL的使用依賴于PostgreSQL的libpq動態庫(32位的PyGreSQL對應32位的libpq,64位的PyGreSQL對應64位的libpq),Linux中可以依賴yum命令解決。在Windows系統使用PyGreSQL需要先安裝libpq,主要方式有兩種:
1.安裝PostgreSQL,并配置libpq、ssl、crypto動態庫位置到環境變量PATH中。
2.安裝psqlodbc,使用PostgreSQL ODBC驅動攜帶的libpq、ssl、crypto動態庫。
使用約束
由于PyGreSQL是基于PostgreSQL的客戶端接口,它的功能DWS并不能完全支持。具體支持情況請見下表。

說明以下接口支持情況是基于Python 3.8.5及PyGreSQL 5.2.4版本。
DWS對PyGreSQL主要接口支持情況
| PyGreSQL | 支持 | 備注 | |
|---|---|---|---|
| Module functions and constants | connect–Open a PostgreSQL connection | Y | - |
| Module functions and constants | get_pqlib_version–get the version of libpq | Y | - |
| Module functions and constants | get/set_defhost–default server host [DV] | Y | - |
| Module functions and constants | get/set_defport–default server port [DV] | Y | - |
| Module functions and constants | get/set_defopt–default connection options [DV] | Y | - |
| Module functions and constants | get/set_defbase–default database name [DV] | Y | - |
| Module functions and constants | get/set_defuser–default database user [DV] | Y | - |
| Module functions and constants | get/set_defpasswd–default database password [DV] | Y | - |
| Module functions and constants | escape_string–escape a string for use within SQL | Y | - |
| Module functions and constants | escape_bytea–escape binary data for use within SQL | Y | - |
| Module functions and constants | unescape_bytea–unescape data that has been retrieved as text | Y | - |
| Module functions and constants | get/set_namedresult–conversion to named tuples | Y | - |
| Module functions and constants | get/set_decimal–decimal type to be used for numeric values | Y | - |
| Module functions and constants | get/set_decimal_point–decimal mark used for monetary values | Y | - |
| Module functions and constants | get/set_bool–whether boolean values are returned as bool objects | Y | - |
| Module functions and constants | get/set_array–whether arrays are returned as list objects | Y | - |
| Module functions and constants | get/set_bytea_escaped–whether bytea data is returned escaped | Y | - |
| Module functions and constants | get/set_jsondecode–decoding JSON format | Y | - |
| Module functions and constants | get/set_cast_hook–fallback typecast function | Y | - |
| Module functions and constants | get/set_datestyle–assume a fixed date style | Y | - |
| Module functions and constants | get/set_typecast–custom typecasting | Y | - |
| Module functions and constants | cast_array/record–fast parsers for arrays and records | Y | - |
| Module functions and constants | Type helpers | Y | - |
| Module functions and constants | Module constants | Y | - |
| Connection–The connection object | query–execute a SQL command string | Y | - |
| Connection–The connection object | send_query - executes a SQL command string asynchronously | Y | - |
| Connection–The connection object | query_prepared–execute a prepared statement | Y | - |
| Connection–The connection object | prepare–create a prepared statement | Y | - |
| Connection–The connection object | describe_prepared–describe a prepared statement | Y | - |
| Connection–The connection object | reset–reset the connection | Y | - |
| Connection–The connection object | poll - completes an asynchronous connection | Y | - |
| Connection–The connection object | cancel–abandon processing of current SQL command | Y | - |
| Connection–The connection object | close–close the database connection | Y | - |
| Connection–The connection object | transaction–get the current transaction state | Y | - |
| Connection–The connection object | parameter–get a current server parameter setting | Y | - |
| Connection–The connection object | date_format–get the currently used date format | Y | - |
| Connection–The connection object | fileno–get the socket used to connect to the database | Y | - |
| Connection–The connection object | set_non_blocking - set the non-blocking status of the connection | Y | - |
| Connection–The connection object | is_non_blocking - report the blocking status of the connection | Y | - |
| Connection–The connection object | getnotify–get the last notify from the server | N | 數據庫不支持listen/notify |
| Connection–The connection object | inserttable–insert a list into a table | Y | copy命令中如果有\n,請使用雙引號引用此字段 |
| Connection–The connection object | get/set_notice_receiver–custom notice receiver | Y | - |
| Connection–The connection object | putline–write a line to the server socket [DA] | Y | - |
| Connection–The connection object | getline–get a line from server socket [DA] | Y | - |
| Connection–The connection object | endcopy–synchronize client and server [DA] | Y | - |
| Connection–The connection object | locreate–create a large object in the database [LO] | N | 大對象相關操作 |
| Connection–The connection object | getlo–build a large object from given oid [LO] | N | 大對象相關操作 |
| Connection–The connection object | loimport–import a file to a large object [LO] | N | 大對象相關操作 |
| Connection–The connection object | Object attributes | Y | - |
| The DB wrapper class | Initialization | Y | - |
| The DB wrapper class | pkey–return the primary key of a table | Y | - |
| The DB wrapper class | get_databases–get list of databases in the system | Y | - |
| The DB wrapper class | get_relations–get list of relations in connected database | Y | - |
| The DB wrapper class | get_tables–get list of tables in connected database | Y | - |
| The DB wrapper class | get_attnames–get the attribute names of a table | Y | - |
| The DB wrapper class | has_table_privilege–check table privilege | Y | - |
| The DB wrapper class | get/set_parameter–get or set run-time parameters | Y | - |
| The DB wrapper class | begin/commit/rollback/savepoint/release–transaction handling | Y | - |
| The DB wrapper class | get–get a row from a database table or view | Y | - |
| The DB wrapper class | insert–insert a row into a database table | Y | - |
| The DB wrapper class | update–update a row in a database table | Y | - |
| The DB wrapper class | upsert–insert a row with conflict resolution | Y | - |
| The DB wrapper class | query–execute a SQL command string | Y | - |
| The DB wrapper class | query_formatted–execute a formatted SQL command string | Y | - |
| The DB wrapper class | query_prepared–execute a prepared statement | Y | - |
| The DB wrapper class | prepare–create a prepared statement | Y | - |
| The DB wrapper class | describe_prepared–describe a prepared statement | Y | - |
| The DB wrapper class | delete_prepared–delete a prepared statement | Y | - |
| The DB wrapper class | clear–clear row values in memory | Y | - |
| The DB wrapper class | delete–delete a row from a database table | Y | 元組必須有唯一鍵或者主鍵 |
| The DB wrapper class | truncate–quickly empty database tables | Y | - |
| The DB wrapper class | get_as_list/dict–read a table as a list or dictionary | Y | - |
| The DB wrapper class | escape_literal/identifier/string/bytea–escape for SQL | Y | - |
| The DB wrapper class | unescape_bytea– unescape data retrieved from the database | Y | - |
| The DB wrapper class | encode/decode_json–encode and decode JSON data | Y | - |
| The DB wrapper class | use_regtypes–determine use of regular type names | Y | - |
| The DB wrapper class | notification_handler–create a notification handler | N | 數據庫不支持listen/notify |
| The DB wrapper class | Attributes of the DB wrapper class | Y | - |
| Query methods | getresult–get query values as list of tuples | Y | - |
| Query methods | dictresult/dictiter–get query values as dictionaries | Y | - |
| Query methods | namedresult/namediter–get query values as named tuples | Y | - |
| Query methods | scalarresult/scalariter–get query values as scalars | Y | - |
| Query methods | one/onedict/onenamed/onescalar–get one result of a query | Y | - |
| Query methods | single/singledict/singlenamed/singlescalar–get single result of a query | Y | - |
| Query methods | listfields–list fields names of previous query result | Y | - |
| Query methods | fieldname, fieldnum–field name/number conversion | Y | - |
| Query methods | fieldinfo–detailed info about query result fields | Y | - |
| Query methods | ntuples–return number of tuples in query object | Y | - |
| Query methods | memsize–return number of bytes allocated by query result | Y | - |
| LargeObject–Large Objects | open–open a large object | N | 大對象相關操作 |
| LargeObject–Large Objects | close–close a large object | N | 大對象相關操作 |
| LargeObject–Large Objects | read, write, tell, seek, unlink–file-like large object handling | N | 大對象相關操作 |
| LargeObject–Large Objects | size–get the large object size | N | 大對象相關操作 |
| LargeObject–Large Objects | export–save a large object to a file | N | 大對象相關操作 |
| LargeObject–Large Objects | Object attributes | N | 大對象相關操作 |
| The Notification Handler | Instantiating the notification handler | N | 數據庫不支持listen/notify |
| The Notification Handler | Invoking the notification handler | N | 數據庫不支持listen/notify |
| The Notification Handler | Sending notifications | N | 數據庫不支持listen/notify |
| The Notification Handler | Auxiliary methods | N | 數據庫不支持listen/notify |
| pgdb | |||
| Module functions and constants | connect–Open a PostgreSQL connection | Y | - |
| Module functions and constants | get/set/reset_typecast–Control the global typecast functions | Y | - |
| Module functions and constants | Module constants | Y | - |
| Module functions and constants | Errors raised by this module | Y | - |
| Connection–The connection object | close–close the connection | Y | - |
| Connection–The connection object | commit–commit the connection | Y | - |
| Connection–The connection object | rollback–roll back the connection | Y | - |
| Connection–The connection object | cursor–return a new cursor object | Y | - |
| Connection–The connection object | Attributes that are not part of the standard | Y | - |
| Cursor–The cursor object | description–details regarding the result columns | Y | - |
| Cursor–The cursor object | rowcount–number of rows of the result | Y | - |
| Cursor–The cursor object | close–close the cursor | Y | - |
| Cursor–The cursor object | execute–execute a database operation | Y | - |
| Cursor–The cursor object | executemany–execute many similar database operations | Y | - |
| Cursor–The cursor object | callproc–Call a stored procedure | Y | - |
| Cursor–The cursor object | fetchone–fetch next row of the query result | Y | - |
| Cursor–The cursor object | fetchmany–fetch next set of rows of the query result | Y | - |
| Cursor–The cursor object | fetchall–fetch all rows of the query result | Y | - |
| Cursor–The cursor object | arraysize - the number of rows to fetch at a time | Y | - |
| Cursor–The cursor object | Methods and attributes that are not part of the standard | Y | - |
| Type–Type objects and constructors | Type constructors | Y | - |
| Type–Type objects and constructors | Type objects | Y | - |
在Linux環境使用PyGreSQL第三方庫連接集群
1.以root用戶登錄Linux環境。
2.執行以下命令創建python_dws.py文件。
vi python_dws.py
請復制粘貼以下內容放入python_dws.py文件中:
#!/usr/bin/env python3
# _*_ encoding:utf-8 _*_
from __future__ import print_function
import pg
def create_table(connection):
print("Begin to create table")
try:
connection.query("drop table if exists test;"
"create table test(id int, name text);")
except pg.InternalError as e:
print(e)
else:
print("Table created successfully")
def insert_data(connection):
print("Begin to insert data")
try:
connection.query("insert into test values(1,'number1');")
connection.query("insert into test values(2,'number2');")
connection.query("insert into test values(3,'number3');")
except pg.InternalError as e:
print(e)
else:
print("Insert data successfully")
def update_data(connection):
print("Begin to update data")
try:
result = connection.query("update test set name = 'numberupdated' where id=1;")
print("Total number of rows updated :", result)
result = connection.query("select * from test order by 1;")
rows = result.getresult()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Update, Operation done successfully")
def delete_data(connection):
print("Begin to delete data")
try:
result = connection.query("delete from test where id=3;")
print("Total number of rows deleted :", result)
result = connection.query("select * from test order by 1;")
rows = result.getresult()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Delete,Operation done successfully")
def select_data(connection):
print("Begin to select data")
try:
result = connection.query("select * from test order by 1;")
rows = result.getresult()
for row in rows:
print("id = ", row[0])
print("name = ", row[1])
except pg.InternalError as e:
print(e)
print("select failed")
else:
print("Operation done successfully")
if __name__ == '__main__':
try:
conn = pg.DB(host='10.154.70.231',
port=8000,
dbname='gaussdb', # 需要連接的database
user='dbadmin',
passwd='password') # 數據庫用戶密碼
except pg.InternalError as ex:
print(ex)
print("Connect database failed")
else:
print("Opened database successfully")
create_table(conn)
insert_data(conn)
select_data(conn)
update_data(conn)
delete_data(conn)
conn.close()
或使用dbapi接口實現:
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from __future__ import print_function
import pg
import pgdb
def create_table(connection):
print("Begin to create table")
try:
cursor = connection.cursor()
cursor.execute("drop table if exists test;"
"create table test(id int, name text);")
connection.commit()
except pg.InternalError as e:
print(e)
else:
print("Table created successfully")
cursor.close()
def insert_data(connection):
print("Begin to insert data")
try:
cursor = connection.cursor()
cursor.execute("insert into test values(1,'number1');")
cursor.execute("insert into test values(2,'number2');")
cursor.execute("insert into test values(3,'number3');")
connection.commit()
except pg.InternalError as e:
print(e)
else:
print("Insert data successfully")
cursor.close()
def update_data(connection):
print("Begin to update data")
try:
cursor = connection.cursor()
cursor.execute("update test set name = 'numberupdated' where id=1;")
connection.commit()
print("Total number of rows updated :", cursor.rowcount)
cursor.execute("select * from test;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Update, Operation done successfully")
def delete_data(connection):
print("Begin to delete data")
try:
cursor = connection.cursor()
cursor.execute("delete from test where id=3;")
connection.commit()
print("Total number of rows deleted :", cursor.rowcount)
cursor.execute("select * from test;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Delete,Operation done successfully")
def select_data(connection):
print("Begin to select data")
try:
cursor = connection.cursor()
cursor.execute("select * from test;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
print("select failed")
else:
print("Operation done successfully")
cursor.close()
if __name__ == '__main__':
try:
conn = pgdb.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要連接的database
user='dbadmin',
password='password') # 數據庫用戶密碼
except pg.InternalError as ex:
print(ex)
print("Connect database failed")
else:
print("Opened database successfully")
create_table(conn)
insert_data(conn)
select_data(conn)
update_data(conn)
delete_data(conn)
conn.close()
3.按照實際集群信息,修改python_dws.py文件中的集群公網訪問地址、集群端口號、數據庫名稱、數據庫用戶名、數據庫密碼。
說明
PyGreSQL接口不提供重試連接的能力,您需要在業務代碼中實現重試處理。
conn = pgdb.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要連接的database
user='dbadmin',
password='password') # 數據庫用戶密碼
4.執行以下命令,使用PyGreSQL第三方庫連接集群。
python python_dws.py
在Windows環境使用PyGreSQL第三方庫連接集群
1.在Windows系統中,單擊“開始”按鈕 ,在搜索框中,鍵入 cmd ,然后在結果列表中單擊“cmd.exe”打開命令提示符窗口。
2.在命令提示符窗口中,執行以下命令創建python_dws.py文件。
type nul> python_dws.py
請復制粘貼以下內容放入python_dws.py文件中:
#!/usr/bin/env python3
# _*_ encoding:utf-8 _*_
from __future__ import print_function
import pg
def create_table(connection):
print("Begin to create table")
try:
connection.query("drop table if exists test;"
"create table test(id int, name text);")
except pg.InternalError as e:
print(e)
else:
print("Table created successfully")
def insert_data(connection):
print("Begin to insert data")
try:
connection.query("insert into test values(1,'number1');")
connection.query("insert into test values(2,'number2');")
connection.query("insert into test values(3,'number3');")
except pg.InternalError as e:
print(e)
else:
print("Insert data successfully")
def update_data(connection):
print("Begin to update data")
try:
result = connection.query("update test set name = 'numberupdated' where id=1;")
print("Total number of rows updated :", result)
result = connection.query("select * from test order by 1;")
rows = result.getresult()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Update, Operation done successfully")
def delete_data(connection):
print("Begin to delete data")
try:
result = connection.query("delete from test where id=3;")
print("Total number of rows deleted :", result)
result = connection.query("select * from test order by 1;")
rows = result.getresult()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Delete,Operation done successfully")
def select_data(connection):
print("Begin to select data")
try:
result = connection.query("select * from test order by 1;")
rows = result.getresult()
for row in rows:
print("id = ", row[0])
print("name = ", row[1])
except pg.InternalError as e:
print(e)
print("select failed")
else:
print("Operation done successfully")
if __name__ == '__main__':
try:
conn = pg.DB(host='10.154.70.231',
port=8000,
dbname='gaussdb', # 需要連接的database
user='dbadmin',
passwd='password') # 數據庫用戶密碼
except pg.InternalError as ex:
print(ex)
print("Connect database failed")
else:
print("Opened database successfully")
create_table(conn)
insert_data(conn)
select_data(conn)
update_data(conn)
delete_data(conn)
conn.close()
或使用dbapi接口實現::
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from __future__ import print_function
import pg
import pgdb
def create_table(connection):
print("Begin to create table")
try:
cursor = connection.cursor()
cursor.execute("drop table if exists test;"
"create table test(id int, name text);")
connection.commit()
except pg.InternalError as e:
print(e)
else:
print("Table created successfully")
cursor.close()
def insert_data(connection):
print("Begin to insert data")
try:
cursor = connection.cursor()
cursor.execute("insert into test values(1,'number1');")
cursor.execute("insert into test values(2,'number2');")
cursor.execute("insert into test values(3,'number3');")
connection.commit()
except pg.InternalError as e:
print(e)
else:
print("Insert data successfully")
cursor.close()
def update_data(connection):
print("Begin to update data")
try:
cursor = connection.cursor()
cursor.execute("update test set name = 'numberupdated' where id=1;")
connection.commit()
print("Total number of rows updated :", cursor.rowcount)
cursor.execute("select * from test;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Update, Operation done successfully")
def delete_data(connection):
print("Begin to delete data")
try:
cursor = connection.cursor()
cursor.execute("delete from test where id=3;")
connection.commit()
print("Total number of rows deleted :", cursor.rowcount)
cursor.execute("select * from test;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
else:
print("After Delete,Operation done successfully")
def select_data(connection):
print("Begin to select data")
try:
cursor = connection.cursor()
cursor.execute("select * from test;")
rows = cursor.fetchall()
for row in rows:
print("id = ", row[0])
print("name = ", row[1], "\n")
except pg.InternalError as e:
print(e)
print("select failed")
else:
print("Operation done successfully")
cursor.close()
if __name__ == '__main__':
try:
conn = pgdb.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要連接的database
user='dbadmin',
password='password') # 數據庫用戶密碼
except pg.InternalError as ex:
print(ex)
print("Connect database failed")
else:
print("Opened database successfully")
create_table(conn)
insert_data(conn)
select_data(conn)
update_data(conn)
delete_data(conn)
conn.close()
3.按照實際集群信息,修改python_dws.py文件中的集群公網訪問地址、集群端口號、數據庫名稱、數據庫用戶名、數據庫密碼。
PyGreSQL接口不提供重試連接的能力,您需要在業務代碼中實現重試處理。
conn = pgdb.connect(host='10.154.70.231',
port='8000',
database='gaussdb', # 需要連接的database
user='dbadmin',
password='password') # 數據庫用戶密碼
4.執行以下命令,使用PyGreSQL第三方庫連接集群。
python python_dws.py