云服务器内容精选
-
完整示例代码 通过DataFrame API访问 认证用的password硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate() # Set cross-source connection parameters url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres" dbtable = "customer" user = "dbadmin" password = "######" driver = "org.postgresql.Driver" # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)]) # Setting schema schema = StructType([StructField("id", IntegerType(), False),\ StructField("name", StringType(), False),\ StructField("age", IntegerType(), False)]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the DWS table dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Overwrite") \ .save() # Read data jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show() # close session sparkSession.stop() 通过SQL API访问 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate() # Createa data table for DLI - associated DWS sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS (\ 'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres',\ 'dbtable'='customer',\ 'user'='dbadmin',\ 'password'='######',\ 'driver'='org.postgresql.Driver')") # Insert data into the DLI data table sparkSession.sql("insert into dli_to_dws values(2,'John',24)") # Read data from DLI data table jdbcDF = sparkSession.sql("select * from dli_to_dws").show() # close session sparkSession.stop()
-
通过DataFrame API访问数据源 连接参数配置 1 2 3 4 5 url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres" dbtable = "customer" user = "dbadmin" password = "######" driver = "org.postgresql.Driver" 设置数据 1 dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)]) 设置schema 1 2 3 schema = StructType([StructField("id", IntegerType(), False),\ StructField("name", StringType(), False),\ StructField("age", IntegerType(), False)]) 创建DataFrame 1 dataFrame = sparkSession.createDataFrame(dataList, schema) 保存数据到DWS 1 2 3 4 5 6 7 8 9 dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Overwrite") \ .save() mode 有四种保存类型: ErrorIfExis:如果已经存在数据,则抛出异常。 Overwrite:如果已经存在数据,则覆盖原数据。 Append:如果已经存在数据,则追加保存。 Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。 读取DWS上的数据 1 2 3 4 5 6 7 8 9 jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show() 操作结果
-
通过SQL API 访问数据源 创建DLI跨源访问 dws 的关联表。 1 2 3 4 5 6 7 sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS ( 'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres',\ 'dbtable'='customer',\ 'user'='dbadmin',\ 'password'='######',\ 'driver'='org.postgresql.Driver')") 建表参数详情可参考表1。 插入数据 1 sparkSession.sql("insert into dli_to_dws values(2,'John',24)") 查询数据 1 jdbcDF = sparkSession.sql("select * from dli_to_dws").show() 操作结果
-
操作前准备 import相关依赖包 1 2 3 from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession 创建会话 1 sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate()
-
完整示例代码 通过SQL API访问 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import org.apache.spark.sql.SparkSession; public class java_rds { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("datasource-rds").getOrCreate(); // Create a data table for DLI-associated RDS sparkSession.sql("CREATE TABLE IF NOT EXISTS dli_to_rds USING JDBC OPTIONS ('url'='jdbc:mysql://192.168.6.150:3306','dbtable'='test.customer','user'='root','password'='**','driver'='com.mysql.jdbc.Driver')"); //*****************************SQL model*********************************** //Insert data into the DLI data table sparkSession.sql("insert into dli_to_rds values(3,'Liu',21),(4,'Joey',34)"); //Read data from DLI data table sparkSession.sql("select * from dli_to_rds"); //drop table sparkSession.sql("drop table dli_to_rds"); sparkSession.close(); } }
-
完整示例代码 直接复制如下样例代码到py文件中后,需要注意文件内容中的“\”后面可能会有unexpected character的问题。需要将“\”后面的缩进或是空格全部删除。 通过DataFrame API访问 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate() # Set cross-source connection parameters. url = "jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306" dbtable = "test.customer" user = "root" password = "######" driver = "com.mysql.jdbc.Driver" # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([(123, "Katie", 19)]) # Setting schema schema = StructType([StructField("id", IntegerType(), False),\ StructField("name", StringType(), False),\ StructField("age", IntegerType(), False)]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the RDS. dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Append") \ .save() # Read data jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show() # close session sparkSession.stop() 通过SQL API访问 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate() # Createa data table for DLI - associated RDS sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_rds USING JDBC OPTIONS (\ 'url'='jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306',\ 'dbtable'='test.customer',\ 'user'='root',\ 'password'='######',\ 'driver'='com.mysql.jdbc.Driver')") # Insert data into the DLI data table sparkSession.sql("insert into dli_to_rds values(3,'John',24)") # Read data from DLI data table jdbcDF = sparkSession.sql("select * from dli_to_rds") jdbcDF.show() # close session sparkSession.stop()
-
完整示例代码 通过SQL API访问MRS的OpenTSDB # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate() # Create a DLI cross-source association opentsdb data table sparkSession.sql(\ "create table opentsdb_test using opentsdb options(\ 'Host'='10.0.0.171:4242',\ 'metric'='cts_opentsdb',\ 'tags'='city,location')") sparkSession.sql("insert into opentsdb_test values('aaa', 'abc', '2021-06-30 18:00:00', 30.0)") result = sparkSession.sql("SELECT * FROM opentsdb_test") result.show() # close session sparkSession.stop() 通过DataFrame API访问OpenTSDB # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-opentsdb").getOrCreate() # Create a DLI cross-source association opentsdb data table sparkSession.sql( "create table opentsdb_test using opentsdb options(\ 'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',\ 'metric'='ct_opentsdb',\ 'tags'='city,location')") # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([("aaa", "abc", 123456L, 30.0)]) # Setting schema schema = StructType([StructField("location", StringType()),\ StructField("name", StringType()),\ StructField("timestamp", LongType()),\ StructField("value", DoubleType())]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Set cross-source connection parameters metric = "ctopentsdb" tags = "city,location" Host = "opentsdb-3xcl8dir15m58z3.cloudtable.com:4242" # Write data to the cloudtable-opentsdb dataFrame.write.insertInto("opentsdb_test") # ******* Opentsdb does not currently implement the ctas method to save data, so the save() method cannot be used.******* # dataFrame.write.format("opentsdb").option("Host", Host).option("metric", metric).option("tags", tags).mode("Overwrite").save() # Read data on CloudTable-OpenTSDB jdbdDF = sparkSession.read\ .format("opentsdb")\ .option("Host",Host)\ .option("metric",metric)\ .option("tags",tags)\ .load() jdbdDF.show() # close session sparkSession.stop()
-
完整示例代码 通过SQL API访问MRS HBase 未开启kerberos认证样例代码 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate() sparkSession.sql( "CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\ 'ZKHost' = '192.168.0.189:2181',\ 'TableName' = 'hbtest',\ 'RowKey' = 'id:5',\ 'Cols' = 'location:info.location,city:detail.city')") sparkSession.sql("insert into testhbase values('95274','abc','Jinan')") sparkSession.sql("select * from testhbase").show() # close session sparkSession.stop() 开启kerberos认证样例代码 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark import SparkFiles from pyspark.sql import SparkSession import shutil import time import os if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("Test_HBase_SparkSql_Kerberos").getOrCreate() sc = sparkSession.sparkContext time.sleep(10) krb5_startfile = SparkFiles.get("krb5.conf") keytab_startfile = SparkFiles.get("user.keytab") path_user = os.getcwd() krb5_endfile = path_user + "/" + "krb5.conf" keytab_endfile = path_user + "/" + "user.keytab" shutil.copy(krb5_startfile, krb5_endfile) shutil.copy(keytab_startfile, keytab_endfile) time.sleep(20) sparkSession.sql( "CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " + "using hbase OPTIONS(" + "'ZKHost'='10.0.0.146:2181'," + "'TableName'='hbtest'," + "'RowKey'='id:100'," + "'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF2.longf,floatf:CF1.floatf,doublef:CF2.doublef'," + "'krb5conf'='" + path_user + "/krb5.conf'," + "'keytab'='" + path_user+ "/user.keytab'," + "'principal'='krbtest') ") sparkSession.sql("insert into testhbase values('95274','abc','Jinan')") sparkSession.sql("select * from testhbase").show() # close session sparkSession.stop() 通过DataFrame API访问HBase # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate() # Createa data table for DLI-associated ct sparkSession.sql(\ "CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,floatf FLOAT,doublef DOUBLE) using hbase OPTIONS ( \ 'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,\ cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,\ cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',\ 'TableName' = 'table_DupRowkey1',\ 'RowKey' = 'id:5,location:6,city:7',\ 'Cols' = 'booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')") # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([("11111", "aaa", "aaa", False, 4, 3, 23, 2.3, 2.34)]) # Setting schema schema = StructType([StructField("id", StringType()), StructField("location", StringType()), StructField("city", StringType()), StructField("booleanf", BooleanType()), StructField("shortf", ShortType()), StructField("intf", IntegerType()), StructField("longf", LongType()), StructField("floatf", FloatType()), StructField("doublef", DoubleType())]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the cloudtable-hbase dataFrame.write.insertInto("test_hbase") # Set cross-source connection parameters TableName = "table_DupRowkey1" RowKey = "id:5,location:6,city:7" Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef" ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181" # Read data on CloudTable-HBase jdbcDF = sparkSession.read.schema(schema)\ .format("hbase")\ .option("ZKHost", ZKHost)\ .option("TableName",TableName)\ .option("RowKey", RowKey)\ .option("Cols", Cols)\ .load() jdbcDF.filter("id = '12333' or id='11111'").show() # close session sparkSession.stop()
-
通过DataFrame API访问数据源 连接配置。 1 2 3 4 val url = "jdbc:postgresql://to-dws-1174405057-EA1Kgo8H.datasource.com:8000/postgres" val username = "dbadmin" val password = "######" val dbtable = "customer" 创建DataFrame,添加数据,并重命名字段。 1 2 3 4 var dataFrame_1 = sparkSession.createDataFrame(List((8, "Jack_1", 18))) val df = dataFrame_1.withColumnRenamed("_1", "id") .withColumnRenamed("_2", "name") .withColumnRenamed("_3", "age") 导入数据到DWS。 1 2 3 4 5 6 7 df.write.format("jdbc") .option("url", url) .option("dbtable", dbtable) .option("user", username) .option("password", password) .mode(SaveMode.Append) .save() SaveMode 有四种保存类型: ErrorIfExis:如果已经存在数据,则抛出异常。 Overwrite:如果已经存在数据,则覆盖原数据。 Append:如果已经存在数据,则追加保存。 Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。 读取DWS上的数据。 方式一:read.format()方法 1 2 3 4 5 6 val jdbcDF = sparkSession.read.format("jdbc") .option("url", url) .option("dbtable", dbtable) .option("user", username) .option("password", password) .load() 方式二:read.jdbc()方法 1 2 3 4 val properties = new Properties() properties.put("user", username) properties.put("password", password) val jdbcDF2 = sparkSession.read.jdbc(url, dbtable, properties) 插入数据前: 插入数据后: 使用上述read.format()或者read.jdbc()方法读取到的dateFrame注册为临时表,就可使用sql语句进行数据查询了。 1 2 jdbcDF.registerTempTable("customer_test") sparkSession.sql("select * from customer_test where id = 1").show() 查询结果:
-
完整示例代码 通过DataFrame API 访问 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-redis").getOrCreate() # Set cross-source connection parameters. host = "192.168.4.199" port = "6379" table = "person" auth = "######" # Create a DataFrame and initialize the DataFrame data. # ******* method noe ********* dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)]) schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False),StructField("age", IntegerType(), False)]) dataFrame_one = sparkSession.createDataFrame(dataList, schema) # ****** method two ****** # jdbcDF = sparkSession.createDataFrame([(3,"Jack", 23)]) # dataFrame = jdbcDF.withColumnRenamed("_1", "id").withColumnRenamed("_2", "name").withColumnRenamed("_3", "age") # Write data to the redis table dataFrame.write.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).mode("Overwrite").save() # Read data sparkSession.read.format("redis").option("host", host).option("port", port).option("table", table).option("password", auth).load().show() # close session sparkSession.stop() 通过SQL API 访问 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession sparkSession = SparkSession.builder.appName("datasource_redis").getOrCreate() sparkSession.sql( "CREATE TEMPORARY VIEW person (name STRING, age INT) USING org.apache.spark.sql.redis OPTIONS (\ 'host' = '192.168.4.199', \ 'port' = '6379',\ 'password' = '######',\ 'table'= 'person')".stripMargin); sparkSession.sql("INSERT INTO TABLE person VALUES ('John', 30),('Peter', 45)".stripMargin) sparkSession.sql("SELECT * FROM person".stripMargin).collect().foreach(println) # close session sparkSession.stop()
-
通过DataFrame API访问数据源 构造schema 1 2 3 4 5 6 7 8 9 10 val attrId = new StructField("id",StringType) val location = new StructField("location",StringType) val city = new StructField("city",StringType) val booleanf = new StructField("booleanf",BooleanType) val shortf = new StructField("shortf",ShortType) val intf = new StructField("intf",IntegerType) val longf = new StructField("longf",LongType) val floatf = new StructField("floatf",FloatType) val doublef = new StructField("doublef",DoubleType) val attrs = Array(attrId, location,city,booleanf,shortf,intf,longf,floatf,doublef) 根据schema的类型构造数据 1 2 val mutableRow: Seq[Any] = Seq("12345","abc","city1",false,null,3,23,2.3,2.34) val rddData: RDD[Row] = sparkSession.sparkContext.parallelize(Array(Row.fromSeq(mutableRow)), 1) 导入数据到HBase 1 sparkSession.createDataFrame(rddData, new StructType(attrs)).write.insertInto("test_hbase") 读取HBase上的数据 1 2 3 4 5 6 7 8 val map = new mutable.HashMap[String, String]() map("TableName") = "table_DupRowkey1" map("RowKey") = "id:5,location:6,city:7" map("Cols") = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef" map("ZKHost")="cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181, cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181" sparkSession.read.schema(new StructType(attrs)).format("hbase").options(map.toMap).load().show() 返回结果:
-
常见问题 Q:作业运行失败,运行日志中有如下报错信息,应该怎么解决? java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V A:该问题是因为所选择的huaweicloud-dis-flink-connector_2.11版本过低导致,请选择2.0.1及以上版本。 Q:运行作业读取DIS数据时,无法读出数据且Taskmanager的运行日志中有如下报错信息,应该怎么解决? ERROR com.huaweicloud.dis.adapter.common.consumer.Coordinator [] - Failed to getCheckpointAsync, error : [400 : {"errorCode":"DIS.4332","message":"app not found. "}], request : [{"stream_name":"xx","partition_id":"shardId-0000000000","checkpoint_type":"LAST_READ","app_name":"xx"}] A: 该问题是因为读取DIS数据所使用的group.id在DIS的Apps中并没有提前创建。
-
环境准备 已在DLI控制台购买了通用队列。 已购买了DIS通道。开通DIS通道。 用户在使用Flink 1.12版本,则依赖的Dis connector版本需要不低于2.0.1,详细代码参考DISFlinkConnector相关依赖,如何配置connector,详细参考自定义Flink Streaming作业。 若读取DIS,且配置groupId,则需要提前在DIS的“App管理”中创建所需的App名称。 请勿将disToDis.properties放在生成的jar包中,在代码里有关于disToDis.properties的路径,如果放在jar包中,代码会找不到disToDis.properties路径。
-
完整示例代码 通过SQL API 访问DWS表 import org.apache.spark.sql.SparkSession; public class java_dws { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("datasource-dws").getOrCreate(); sparkSession.sql("CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS ('url'='jdbc:postgresql://10.0.0.233:8000/postgres','dbtable'='test','user'='dbadmin','password'='**')"); //*****************************SQL model*********************************** //Insert data into the DLI data table sparkSession.sql("insert into dli_to_dws values(3,'Liu'),(4,'Xie')"); //Read data from DLI data table sparkSession.sql("select * from dli_to_dws").show(); //drop table sparkSession.sql("drop table dli_to_dws"); sparkSession.close(); } }
-
通过SQL API 访问数据源 创建DLI跨源访问DWS的关联表,填写连接参数。 1 sparkSession.sql("CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS ('url'='jdbc:postgresql://10.0.0.233:8000/postgres','dbtable'='test','user'='dbadmin','password'='**')"); 插入数据 1 sparkSession.sql("insert into dli_to_dws values(3,'Liu'),(4,'Xie')"); 查询数据 1 sparkSession.sql("select * from dli_to_dws").show(); 插入数据后:
更多精彩内容
CDN加速
GaussDB
文字转换成语音
免费的服务器
如何创建网站
域名网站购买
私有云桌面
云主机哪个好
域名怎么备案
手机云电脑
SSL证书申请
云点播服务器
免费OCR是什么
电脑云桌面
域名备案怎么弄
语音转文字
文字图片识别
云桌面是什么
网址安全检测
网站建设搭建
国外CDN加速
SSL免费证书申请
短信批量发送
图片OCR识别
云数据库MySQL
个人域名购买
录音转文字
扫描图片识别文字
OCR图片识别
行驶证识别
虚拟电话号码
电话呼叫中心软件
怎么制作一个网站
Email注册网站
华为VNC
图像文字识别
企业网站制作
个人网站搭建
华为云计算
免费租用云托管
云桌面云服务器
ocr文字识别免费版
HTTPS证书申请
图片文字识别转换
国外域名注册商
使用免费虚拟主机
云电脑主机多少钱
鲲鹏云手机
短信验证码平台
OCR图片文字识别
SSL证书是什么
申请企业邮箱步骤
免费的企业用邮箱
云免流搭建教程
域名价格