数据仓库服务 GAUSSDB(DWS)-使用Python第三方库psycopg2连接GaussDB(DWS)集群:在Linux环境使用psycopg2第三方库连接集群
时间:2025-04-30 16:37:27
在Linux环境使用psycopg2第三方库连接集群
- 以root用户登录Linux环境。
- 执行以下命令创建python_dws.py文件。
vi python_dws.py
请复制粘贴以下内容放入python_dws.py文件中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
#!/usr/bin/python # -*- coding: UTF-8 -*- from __future__ import print_function import psycopg2 def create_table(connection): print("Begin to create table") try: cursor = connection.cursor() cursor.execute("drop table if exists test;" "create table test(id int, name text);") connection.commit() except psycopg2.ProgrammingError as e: print(e) else: print("Table created successfully") cursor.close() def insert_data(connection): print("Begin to insert data") try: cursor = connection.cursor() cursor.execute("insert into test values(1,'number1');") cursor.execute("insert into test values(2,'number2');") cursor.execute("insert into test values(3,'number3');") connection.commit() except psycopg2.ProgrammingError as e: print(e) else: print("Insert data successfully") cursor.close() def update_data(connection): print("Begin to update data") try: cursor = connection.cursor() cursor.execute("update test set name = 'numberupdated' where id=1;") connection.commit() print("Total number of rows updated :", cursor.rowcount) cursor.execute("select * from test order by 1;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except psycopg2.ProgrammingError as e: print(e) else: print("After Update, Operation done successfully") def delete_data(connection): print("Begin to delete data") try: cursor = connection.cursor() cursor.execute("delete from test where id=3;") connection.commit() print("Total number of rows deleted :", cursor.rowcount) cursor.execute("select * from test order by 1;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except psycopg2.ProgrammingError as e: print(e) else: print("After Delete,Operation done successfully") def select_data(connection): print("Begin to select data") try: cursor = connection.cursor() cursor.execute("select * from test order by 1;") rows = cursor.fetchall() for row in rows: print("id = ", row[0]) print("name = ", row[1], "\n") except psycopg2.ProgrammingError as e: print(e) print("select failed") else: print("Operation done successfully") cursor.close() if __name__ == '__main__': try: conn = psycopg2.connect(host='10.154.70.231', port='8000', database='gaussdb', # 需要连接的database user='dbadmin', password='password') # 数据库用户密码 except psycopg2.DatabaseError as ex: print(ex) print("Connect database failed") else: print("Opened database successfully") create_table(conn) insert_data(conn) select_data(conn) update_data(conn) delete_data(conn) conn.close()
- 按照实际集群信息,修改python_dws.py文件中的集群公网访问地址、集群端口号、数据库名称、数据库用户名、数据库密码。
psycopg2接口不提供重试连接的能力,您需要在业务代码中实现重试处理。
1 2 3 4 5
conn = psycopg2.connect(host='10.154.70.231', port='8000', database='gaussdb', # 需要连接的database user='dbadmin', password='password') # 数据库用户密码
- 执行以下命令,使用psycopg第三方库连接集群。
python python_dws.py
support.huaweicloud.com/mgtg-dws/dws_01_0120.html
推荐文章