云服务器内容精选

  • 完整示例代码 通过DataFrame API访问 认证用的password硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate() # Set cross-source connection parameters url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres" dbtable = "customer" user = "dbadmin" password = "######" driver = "org.postgresql.Driver" # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)]) # Setting schema schema = StructType([StructField("id", IntegerType(), False),\ StructField("name", StringType(), False),\ StructField("age", IntegerType(), False)]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the DWS table dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Overwrite") \ .save() # Read data jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show() # close session sparkSession.stop() 通过SQL API访问 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate() # Createa data table for DLI - associated DWS sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS (\ 'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres',\ 'dbtable'='customer',\ 'user'='dbadmin',\ 'password'='######',\ 'driver'='org.postgresql.Driver')") # Insert data into the DLI data table sparkSession.sql("insert into dli_to_dws values(2,'John',24)") # Read data from DLI data table jdbcDF = sparkSession.sql("select * from dli_to_dws").show() # close session sparkSession.stop()
  • 通过DataFrame API访问数据源 连接参数配置 1 2 3 4 5 url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres" dbtable = "customer" user = "dbadmin" password = "######" driver = "org.postgresql.Driver" 设置数据 1 dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)]) 设置schema 1 2 3 schema = StructType([StructField("id", IntegerType(), False),\ StructField("name", StringType(), False),\ StructField("age", IntegerType(), False)]) 创建DataFrame 1 dataFrame = sparkSession.createDataFrame(dataList, schema) 保存数据到DWS 1 2 3 4 5 6 7 8 9 dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Overwrite") \ .save() mode 有四种保存类型: ErrorIfExis:如果已经存在数据,则抛出异常。 Overwrite:如果已经存在数据,则覆盖原数据。 Append:如果已经存在数据,则追加保存。 Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。 读取DWS上的数据 1 2 3 4 5 6 7 8 9 jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show() 操作结果
  • 通过SQL API 访问数据源 创建DLI跨源访问 dws 的关联表。 1 2 3 4 5 6 7 sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS ( 'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres',\ 'dbtable'='customer',\ 'user'='dbadmin',\ 'password'='######',\ 'driver'='org.postgresql.Driver')") 建表参数详情可参考表1。 插入数据 1 sparkSession.sql("insert into dli_to_dws values(2,'John',24)") 查询数据 1 jdbcDF = sparkSession.sql("select * from dli_to_dws").show() 操作结果
  • 操作前准备 import相关依赖包 1 2 3 from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession 创建会话 1 sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate()
  • 完整示例代码 通过SQL API访问 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import org.apache.spark.sql.SparkSession; public class java_rds { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("datasource-rds").getOrCreate(); // Create a data table for DLI-associated RDS sparkSession.sql("CREATE TABLE IF NOT EXISTS dli_to_rds USING JDBC OPTIONS ('url'='jdbc:mysql://192.168.6.150:3306','dbtable'='test.customer','user'='root','password'='**','driver'='com.mysql.jdbc.Driver')"); //*****************************SQL model*********************************** //Insert data into the DLI data table sparkSession.sql("insert into dli_to_rds values(3,'Liu',21),(4,'Joey',34)"); //Read data from DLI data table sparkSession.sql("select * from dli_to_rds"); //drop table sparkSession.sql("drop table dli_to_rds"); sparkSession.close(); } }
  • 完整示例代码 直接复制如下样例代码到py文件中后,需要注意文件内容中的“\”后面可能会有unexpected character的问题。需要将“\”后面的缩进或是空格全部删除。 通过DataFrame API访问 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate() # Set cross-source connection parameters. url = "jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306" dbtable = "test.customer" user = "root" password = "######" driver = "com.mysql.jdbc.Driver" # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([(123, "Katie", 19)]) # Setting schema schema = StructType([StructField("id", IntegerType(), False),\ StructField("name", StringType(), False),\ StructField("age", IntegerType(), False)]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the RDS. dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Append") \ .save() # Read data jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show() # close session sparkSession.stop() 通过SQL API访问 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate() # Createa data table for DLI - associated RDS sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_rds USING JDBC OPTIONS (\ 'url'='jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306',\ 'dbtable'='test.customer',\ 'user'='root',\ 'password'='######',\ 'driver'='com.mysql.jdbc.Driver')") # Insert data into the DLI data table sparkSession.sql("insert into dli_to_rds values(3,'John',24)") # Read data from DLI data table jdbcDF = sparkSession.sql("select * from dli_to_rds") jdbcDF.show() # close session sparkSession.stop()
  • 完整示例代码 通过SQL API访问MRS的OpenTSDB # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate() # Create a DLI cross-source association opentsdb data table sparkSession.sql(\ "create table opentsdb_test using opentsdb options(\ 'Host'='10.0.0.171:4242',\ 'metric'='cts_opentsdb',\ 'tags'='city,location')") sparkSession.sql("insert into opentsdb_test values('aaa', 'abc', '2021-06-30 18:00:00', 30.0)") result = sparkSession.sql("SELECT * FROM opentsdb_test") result.show() # close session sparkSession.stop() 通过DataFrame API访问OpenTSDB # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate() # Create a DLI cross-source association opentsdb data table sparkSession.sql( "create table opentsdb_test using opentsdb options(\ 'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',\ 'metric'='ct_opentsdb',\ 'tags'='city,location')") # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([("aaa", "abc", 123456L, 30.0)]) # Setting schema schema = StructType([StructField("location", StringType()),\ StructField("name", StringType()),\ StructField("timestamp", LongType()),\ StructField("value", DoubleType())]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Set cross-source connection parameters metric = "ctopentsdb" tags = "city,location" Host = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242" # Write data to the cloudtable-opentsdb dataFrame.write.insertInto("opentsdb_test") # ******* Opentsdb does not currently implement the ctas method to save data, so the save() method cannot be used.******* # dataFrame.write.format("opentsdb").option("Host", Host).option("metric", metric).option("tags", tags).mode("Overwrite").save() # Read data on CloudTable-OpenTSDB jdbdDF = sparkSession.read\ .format("opentsdb")\ .option("Host",Host)\ .option("metric",metric)\ .option("tags",tags)\ .load() jdbdDF.show() # close session sparkSession.stop()
  • 完整示例代码 通过SQL API访问MRS HBase 未开启kerberos认证样例代码 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate() sparkSession.sql( "CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\ 'ZKHost' = '192.168.0.189:2181',\ 'TableName' = 'hbtest',\ 'RowKey' = 'id:5',\ 'Cols' = 'location:info.location,city:detail.city')") sparkSession.sql("insert into testhbase values('95274','abc','Jinan')") sparkSession.sql("select * from testhbase").show() # close session sparkSession.stop() 开启kerberos认证样例代码 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark import SparkFiles from pyspark.sql import SparkSession import shutil import time import os if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("Test_HBase_SparkSql_Kerberos").getOrCreate() sc = sparkSession.sparkContext time.sleep(10) krb5_startfile = SparkFiles.get("krb5.conf") keytab_startfile = SparkFiles.get("user.keytab") path_user = os.getcwd() krb5_endfile = path_user + "/" + "krb5.conf" keytab_endfile = path_user + "/" + "user.keytab" shutil.copy(krb5_startfile, krb5_endfile) shutil.copy(keytab_startfile, keytab_endfile) time.sleep(20) sparkSession.sql( "CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " + "using hbase OPTIONS(" + "'ZKHost'='10.0.0.146:2181'," + "'TableName'='hbtest'," + "'RowKey'='id:100'," + "'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF2.longf,floatf:CF1.floatf,doublef:CF2.doublef'," + "'krb5conf'='" + path_user + "/krb5.conf'," + "'keytab'='" + path_user+ "/user.keytab'," + "'principal'='krbtest') ") sparkSession.sql("insert into testhbase values('95274','abc','Jinan')") sparkSession.sql("select * from testhbase").show() # close session sparkSession.stop() 通过DataFrame API访问HBase # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate() # Createa data table for DLI-associated ct sparkSession.sql(\ "CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,floatf FLOAT,doublef DOUBLE) using hbase OPTIONS ( \ 'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,\ cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,\ cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',\ 'TableName' = 'table_DupRowkey1',\ 'RowKey' = 'id:5,location:6,city:7',\ 'Cols' = 'booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')") # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([("11111", "aaa", "aaa", False, 4, 3, 23, 2.3, 2.34)]) # Setting schema schema = StructType([StructField("id", StringType()), StructField("location", StringType()), StructField("city", StringType()), StructField("booleanf", BooleanType()), StructField("shortf", ShortType()), StructField("intf", IntegerType()), StructField("longf", LongType()), StructField("floatf", FloatType()), StructField("doublef", DoubleType())]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the cloudtable-hbase dataFrame.write.insertInto("test_hbase") # Set cross-source connection parameters TableName = "table_DupRowkey1" RowKey = "id:5,location:6,city:7" Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef" ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181" # Read data on CloudTable-HBase jdbcDF = sparkSession.read.schema(schema)\ .format("hbase")\ .option("ZKHost", ZKHost)\ .option("TableName",TableName)\ .option("RowKey", RowKey)\ .option("Cols", Cols)\ .load() jdbcDF.filter("id = '12333' or id='11111'").show() # close session sparkSession.stop()
  • 通过DataFrame API访问数据源 连接配置。 1 2 3 4 val url = "jdbc:postgresql://to-dws-1174405057-EA1Kgo8H.datasource.com:8000/postgres" val username = "dbadmin" val password = "######" val dbtable = "customer" 创建DataFrame,添加数据,并重命名字段。 1 2 3 4 var dataFrame_1 = sparkSession.createDataFrame(List((8, "Jack_1", 18))) val df = dataFrame_1.withColumnRenamed("_1", "id") .withColumnRenamed("_2", "name") .withColumnRenamed("_3", "age") 导入数据到DWS。 1 2 3 4 5 6 7 df.write.format("jdbc") .option("url", url) .option("dbtable", dbtable) .option("user", username) .option("password", password) .mode(SaveMode.Append) .save() SaveMode 有四种保存类型: ErrorIfExis:如果已经存在数据,则抛出异常。 Overwrite:如果已经存在数据,则覆盖原数据。 Append:如果已经存在数据,则追加保存。 Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。 读取DWS上的数据。 方式一:read.format()方法 1 2 3 4 5 6 val jdbcDF = sparkSession.read.format("jdbc") .option("url", url) .option("dbtable", dbtable) .option("user", username) .option("password", password) .load() 方式二:read.jdbc()方法 1 2 3 4 val properties = new Properties() properties.put("user", username) properties.put("password", password) val jdbcDF2 = sparkSession.read.jdbc(url, dbtable, properties) 插入数据前: 插入数据后: 使用上述read.format()或者read.jdbc()方法读取到的dateFrame注册为临时表,就可使用sql语句进行数据查询了。 1 2 jdbcDF.registerTempTable("customer_test") sparkSession.sql("select * from customer_test where id = 1").show() 查询结果:
  • 完整示例代码 通过DataFrame API 访问 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-redis").getOrCreate() # Set cross-source connection parameters. host = "192.168.4.199" port = "6379" table = "person" auth = "######" # Create a DataFrame and initialize the DataFrame data. # ******* method noe ********* dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)]) schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False),StructField("age", IntegerType(), False)]) dataFrame_one = sparkSession.createDataFrame(dataList, schema) # ****** method two ****** # jdbcDF = sparkSession.createDataFrame([(3,"Jack", 23)]) # dataFrame = jdbcDF.withColumnRenamed("_1", "id").withColumnRenamed("_2", "name").withColumnRenamed("_3", "age") # Write data to the redis table dataFrame.write.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).mode("Overwrite").save() # Read data sparkSession.read.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).load().show() # close session sparkSession.stop() 通过SQL API 访问 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession sparkSession = SparkSession.builder.appName("datasource_redis").getOrCreate() sparkSession.sql( "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (\ 'host' = '192.168.4.199', \ 'port' = '6379',\ 'password' = '######',\ 'table'= 'person')".stripMargin); sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin) sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println) # close session sparkSession.stop()
  • 通过DataFrame API访问数据源 构造schema 1 2 3 4 5 6 7 8 9 10 val attrId = new StructField("id",StringType) val location = new StructField("location",StringType) val city = new StructField("city",StringType) val booleanf = new StructField("booleanf",BooleanType) val shortf = new StructField("shortf",ShortType) val intf = new StructField("intf",IntegerType) val longf = new StructField("longf",LongType) val floatf = new StructField("floatf",FloatType) val doublef = new StructField("doublef",DoubleType) val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef) 根据schema的类型构造数据 1 2 val mutableRow: Seq[Any] = Seq("12345","abc","city1",false,null,3,23,2.3,2.34) val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1) 导入数据到HBase 1 sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase") 读取HBase上的数据 1 2 3 4 5 6 7 8 val map = new mutable.HashMap[String, String]() map("TableName") = "table_DupRowkey1" map("RowKey") = "id:5,location:6,city:7" map("Cols") = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef" map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181" sparkSession.read.schema(new StructType(attrs)).format("hbase").options(map.toMap).load().show() 返回结果:
  • 常见问题 Q:作业运行失败,运行日志中有如下报错信息,应该怎么解决? java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V A:该问题是因为所选择的huaweicloud-dis-flink-connector_2.11版本过低导致,请选择2.0.1及以上版本。 Q:运行作业读取DIS数据时,无法读出数据且Taskmanager的运行日志中有如下报错信息,应该怎么解决? ERROR com.huaweicloud.dis.adapter.common.consumer.Coordinator [] - Failed to getCheckpointAsync, error : [400 : {"errorCode":"DIS.4332","message":"app not found. "}], request : [{"stream_name":"xx","partition_id":"shardId-0000000000","checkpoint_type":"LAST_READ","app_name":"xx"}] A: 该问题是因为读取DIS数据所使用的group.id在DIS的Apps中并没有提前创建。
  • 环境准备 已在DLI控制台购买了通用队列。 已购买了DIS通道。开通DIS通道。 用户在使用Flink 1.12版本,则依赖的Dis connector版本需要不低于2.0.1,详细代码参考DISFlinkConnector相关依赖,如何配置connector,详细参考自定义Flink Streaming作业。 若读取DIS,且配置groupId,则需要提前在DIS的“App管理”中创建所需的App名称。 请勿将disToDis.properties放在生成的jar包中,在代码里有关于disToDis.properties的路径,如果放在jar包中,代码会找不到disToDis.properties路径。
  • 完整示例代码 通过SQL API 访问DWS表 import org.apache.spark.sql.SparkSession; public class java_dws { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("datasource-dws").getOrCreate(); sparkSession.sql("CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS ('url'='jdbc:postgresql://10.0.0.233:8000/postgres','dbtable'='test','user'='dbadmin','password'='**')"); //*****************************SQL model*********************************** //Insert data into the DLI data table sparkSession.sql("insert into dli_to_dws values(3,'Liu'),(4,'Xie')"); //Read data from DLI data table sparkSession.sql("select * from dli_to_dws").show(); //drop table sparkSession.sql("drop table dli_to_dws"); sparkSession.close(); } }
  • 通过SQL API 访问数据源 创建DLI跨源访问DWS的关联表,填写连接参数。 1 sparkSession.sql("CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS ('url'='jdbc:postgresql://10.0.0.233:8000/postgres','dbtable'='test','user'='dbadmin','password'='**')"); 插入数据 1 sparkSession.sql("insert into dli_to_dws values(3,'Liu'),(4,'Xie')"); 查询数据 1 sparkSession.sql("select * from dli_to_dws").show(); 插入数据后: