云服务器内容精选
-
RDS for SQL Server 2019功能概览 类别 功能 版本 web版 标准版 企业版 单机 单机 主备 集群版 实例管理 购买实例 √ √ √ √ 重启实例 √ √ √ √ 实例续费 √ √ √ √ 变更计费方式 按需实例转包周期 包周期实例转按需 √ √ √ √ 变更实例的CPU和规格 √ √ √ √ 重置密码 √ √ √ √ 单机转主备 × √ × × 磁盘扩容 √ √ √ √ 手动切换主备实例 × × √ √ 删除按需实例 √ √ √ √ 升级引擎版本 × × × × 创建只读实例 × × × √ 修改实例名称 √ √ √ √ 修改备注 √ √ √ √ 创建指定实例字符集 √ √ √ √ 设置可维护时间段 √ √ √ √ 购买相同配置实例 √ √ √ √ 标签 管理标签 √ √ √ √ 任务中心 任务中心 √ √ √ √ 数据迁移 DRS备份迁移 √ √ √ √ 数据库连接 VPC内访问(内网访问) √ √ √ √ 公网访问 √ √ √ √ 修改内网地址 √ √ √ √ 修改数据库端口 √ √ √ √ 备份恢复 全量备份(自动备份) √ √ √ √ 增量备份 √ √ √ √ 下载备份 √ √ √ √ 复制备份 √ √ √ √ 自动备份策略 √ √ √ √ 通过备份文件恢复到RDS for SQL Server √ √ √ √ 恢复实例到指定时间点 √ √ √ √ 手动备份 √ √ √ √ 恢复到新实例 √ √ √ √ 恢复到已有实例 √ √ √ √ 设置跨区域备份策略 √ √ √ √ 监控与告警 资源监控 √ √ √ √ 引擎监控 √ √ √ √ 设置告警规则 √ √ √ √ 参数管理 创建参数模板 √ √ √ √ 比较参数模板 √ √ √ √ 应用参数模板 √ √ √ √ 查看参数模板应用记录 √ √ √ √ 编辑参数模板 √ √ √ √ 导出参数模板 √ √ √ √ 查看参数修改历史 √ √ √ √ 日志管理 查看或下载运行日志 √ √ √ √ 查看或下载慢日志 √ √ √ √ 安全管理 云审计服务 √ √ √ √ 数据库审计(DBSS) √ √ √ √ 服务端加密 √ √ √ √ 数据库安全 √ √ √ √ 修改实例安全组 √ √ √ √ 开启透明数据加密(TDE) × √ √ √ 回收站 设置回收站策略 √ √ √ √ 重建实例 √ √ √ √ 引擎主要功能 账号管理 √ √ √ √ 数据库管理 √ √ √ √ 加域实例创建与访问 √ √ √ √ 分布式事务(MSDTC) √ √ √ √ 集成服务(SSIS) × × × √ 文件流(FileStream) √ × × √ 开启CLR集成功能 √ √ √ √ 创建和配置Agent Job和Dblink √ √ √ √ 存储过程相关 修改自定义数据库名称 √ √ √ √ 变更数据捕获(CDC) × √ √ √ 收缩数据库 √ √ √ √ 数据库复制 √ √ √ √ 更新数据库统计信息 √ √ √ √ 跟踪标记 √ √ √ √ 最佳实践 发布与订阅 √ √ √ √ 对接SSRS报表服务 √ √ √ √ 添加链接服务器 √ √ √ √ 父主题: RDS for SQL Server各版本功能概览
-
RDS for SQL Server 2022功能概览 类别 功能 版本 web版 标准版 企业版 单机 单机 主备 集群版 实例管理 购买实例 √ √ √ √ 重启实例 √ √ √ √ 实例续费 √ √ √ √ 变更计费方式 按需实例转包周期 包周期实例转按需 √ √ √ √ 变更实例的CPU和规格 √ √ √ √ 重置密码 √ √ √ √ 单机转主备 × √ × × 磁盘扩容 √ √ √ √ 手动切换主备实例 × × √ √ 删除按需实例 √ √ √ √ 升级引擎版本 × × × × 创建只读实例 × × × √ 修改实例名称 √ √ √ √ 修改备注 √ √ √ √ 创建指定实例字符集 √ √ √ √ 设置可维护时间段 √ √ √ √ 购买相同配置实例 √ √ √ √ 标签 管理标签 √ √ √ √ 任务中心 任务中心 √ √ √ √ 数据迁移 DRS备份迁移 √ √ √ √ 数据库连接 VPC内访问(内网访问) √ √ √ √ 公网访问 √ √ √ √ 修改内网地址 √ √ √ √ 修改数据库端口 √ √ √ √ 备份恢复 全量备份(自动备份) √ √ √ √ 增量备份 √ √ √ √ 下载备份 √ √ √ √ 复制备份 √ √ √ √ 自动备份策略 √ √ √ √ 通过备份文件恢复到RDS for SQL Server √ √ √ √ 恢复实例到指定时间点 √ √ √ √ 手动备份 √ √ √ √ 恢复到新实例 √ √ √ √ 恢复到已有实例 √ √ √ √ 设置跨区域备份策略 √ √ √ √ 监控与告警 资源监控 √ √ √ √ 引擎监控 √ √ √ √ 设置告警规则 √ √ √ √ 参数管理 创建参数模板 √ √ √ √ 比较参数模板 √ √ √ √ 应用参数模板 √ √ √ √ 查看参数模板应用记录 √ √ √ √ 编辑参数模板 √ √ √ √ 导出参数模板 √ √ √ √ 查看参数修改历史 √ √ √ √ 日志管理 查看或下载运行日志 √ √ √ √ 查看或下载慢日志 √ √ √ √ 安全管理 云审计服务 √ √ √ √ 数据库审计(DBSS) √ √ √ √ 服务端加密 √ √ √ √ 数据库安全 √ √ √ √ 修改实例安全组 √ √ √ √ 开启透明数据加密(TDE) × √ √ √ 回收站 设置回收站策略 √ √ √ √ 重建实例 √ √ √ √ 引擎主要功能 账号管理 √ √ √ √ 数据库管理 √ √ √ √ 加域实例创建与访问 √ √ √ √ 分布式事务(MSDTC) √ √ √ √ 集成服务(SSIS) × × × √ 文件流(FileStream) √ × × √ 开启CLR集成功能 √ √ √ √ 创建和配置Agent Job和Dblink √ √ √ √ 存储过程相关 修改自定义数据库名称 √ √ √ √ 变更数据捕获(CDC) × √ √ √ 收缩数据库 √ √ √ √ 数据库复制 √ √ √ √ 更新数据库统计信息 √ √ √ √ 跟踪标记 √ √ √ √ 最佳实践 发布与订阅 √ √ √ √ 对接SSRS报表服务 √ √ √ √ 添加链接服务器 √ √ √ √ 父主题: RDS for SQL Server各版本功能概览
-
样例代码 from kazoo.client import KazooClient zk = KazooClient(hosts='ZookeeperHost') zk.start() result=zk.get("/thriftserver/active_thriftserver") result=result[0].decode('utf-8') JDBCServerHost=result[0].split(":")[0] JDBCServerPort=result[0].split(":")[1] from pyhive import hive conn = hive.Connection(host=JDBCServerHost, port=JDBCServerPort,database='default') cursor=conn.cursor() cursor.execute("select * from test") for result in cursor.fetchall(): print result 其中,ZookeeperHost使用4获取到的zookeeper IP和PORT替换。
-
环境准备 安装支持环境。(开发环境请参考Spark应用开发环境简介准备) 执行以下命令安装编译工具: yum install cyrus-sasl-devel -y yum install gcc-c++ -y 安装相应的python模块。 需要安装sasl,thrift,thrift-sasl,PyHive。 pip install sasl pip install thrift pip install thrift-sasl pip install PyHive 安装python连接zookeeper工具。 pip install kazoo 从MRS集群上获取相应参数。 zookeeper的IP和PORT: 可以查看配置文件/opt/client/Spark/spark/conf/hive-site.xml中的配置项spark.deploy.zookeeper.url zookeeper 上存放JDBCServer主节点的IP和PORT: 可以查看配置文件/opt/client/Spark/spark/conf/hive-site.xml中的配置项spark.thriftserver.zookeeper.dir(默认是/thriftserver),在此znode子节点(active_thriftserver)上存放了JDBCServer主节点的IP和PORT
-
示例 从Kafka源表中读取数据,将GaussDB(DWS)表作为维表,并将二者生成的宽表信息写入print结果表中,其具体步骤如下: 连接GaussDB(DWS)数据库实例,在GaussDB(DWS)中创建相应的表,作为维表,表名为area_info,SQL语句如下: 1 2 3 4 5 6 7 8 9 create table public.area_info( area_id VARCHAR, area_province_name VARCHAR, area_city_name VARCHAR, area_county_name VARCHAR, area_street_name VARCHAR, region_name VARCHAR, PRIMARY KEY(area_id) ); 连接GaussDB(DWS)数据库实例,向GaussDB(DWS)维表area_info中插入测试数据,其语句如下: 1 2 3 4 5 6 7 insert into area_info (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) values ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'), ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'), ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'), ('330110', 'a1', 'b1', 'c4', 'd4', 'e1'); flink sql创建源表、结果表、维表并执行SQL: CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'order_test', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'dws-order', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DwsAddress:DwsPort/DwsDbName', 'tableName' = 'area_info', 'username' = 'DwsUserName', 'password' = 'DwsPassword', 'lookupCacheMaxRows' = '10000', 'lookupCacheExpireAfterAccess' = '2h' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'print' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id; 在Kafka中写入数据: 1 2 3 {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} 结果参考如下:
-
语法格式 1 2 3 4 5 6 7 8 9 10 11 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '' );
-
参数说明 表1 数据库配置 参数 说明 默认值 connector flink框架区分Connector参数,固定为dws。 - url 数据库连接地址。 - username 配置连接用户。 - password 数据库用户密码。 - 表2 连接配置参数 参数 名称 类型 说明 默认值 connectionSize 读取线程池大小 int 用于执行操作的线程数量 = 数据库连接数量,同写入线程大小。 1 readBatchSize 最多一次将get请求合并提交的数量 int 当查询请求积压后,最大的批量查询数量。 128 readBatchQueueSize get请求缓冲池大小 int 查询请求最大积压容量。 256 readTimeoutMs get操作的超时时间(毫秒/ms) int 默认值0表示不超时,会在两处位置生效: get操作从用户开始执行到client准备提交到dws的等待时间。 get sql的执行超时,即statement query timeout。 0 readSyncThreadEnable 非异步查询时,是否开启线程池 boolean 开启后future.get()异步阻塞,关闭后主线程同步调用阻塞。 true lookupScanEnable 是否开启scan查询 boolean 关联条件在非全主键匹配下,是否开启scan查询。 若为false,则join关联条件必须全为主键,否则将抛异常。 false fetchSize / lookupScanFetchSize scan一次查询大小 int 非全主键匹配下,一次条件查询的返回数量限制(默认fetchSize生效,当fetchSize为0时,lookupScanFetchSize生效)。 1000 lookupScanTimeoutMs scan操作的超时时间(毫秒/ms) int 非全主键匹配下,一次条件查询的超时限制(ms)。 60000 lookupAsync 是否采用异步方式获取数据 boolean 查询方式设置为同步or异步。 true lookupCacheType 缓存策略 LookupCacheType 设置以下缓存策略(不区分大小写): None:无缓存LRU(默认值):缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果未找到,则去物理维表中查找。 ALL:全量数据缓存,适合不常更新小表。 LookupCacheType.LRU lookupCacheMaxRows 缓存大小 long 当选择LRU缓存策略后,可以设置缓存大小。 1000 lookupCacheExpireAfterAccess 读取后开始计算的超时时间 Duration 当选择LRU缓存策略后,可以设置每次读取后,超时时间顺延长,默认不生效。 null lookupCacheExpireAfterWrite 写入后开始计算的超时时间 Duration 当选择LRU缓存策略后,可以设置每次写入后,超时时间固定,不论访问与否。 10s lookupCacheMissingKey 数据不存在后写入缓存 boolean 当选择LRU缓存策略后,维表数据不存在,同时将数据缓存。 false lookupCacheReloadStrategy 全量缓存重载策略 ReloadStrategy 当选择ALL缓存策略后,可以设置以下数据重载策略: PERIODIC:周期性数据重载。 TIMED:定时数据重载,以天为单位。 ReloadStrategy.PERIODIC lookupCachePeriodicReloadInterval 数据重载时间间隔 Duration 当选择PERIOD重载策略时,可以设置全量缓存重载间隔。 1h lookupCachePeriodicReloadMode 数据重载模式 ScheduleMode 当选择PERIOD重载策略时,可以设置以下重载模式(不区分大小写): FIXED_DELAY:从上一个加载结束计算重新加载间隔。 FIXED_RATE:从上一个加载开始计算重新加载间隔。 ScheduleMode.FIXED_DELAY lookupCacheTimedReloadTime 数据重载定时调度时间 string 当选择TIMED重载策略时,可以设置全量缓存重载时间,以ISO-8601格式表示。例如:“10:15”。 00:00 lookupCacheTimedReloadIntervalDays 数据重载定时周期调度间隔天数 int 当选择TIMED重载策略时,可以设置全量缓存周期调度间隔天数。 1
-
订阅 选择对应设置的发布,单击右键选择“新建订阅”。 图17 新建订阅 创建订阅的发布,单击“下一步”。 图18 创建发布 选择推送订阅,单击“下一步”。 图19 分发代理位置 选择“添加订阅服务器”,支持SQL Server引擎和非SQL Server作为订阅服务器,将创建并根据上述步骤配置的华为云实例作为一个订阅服务器。 图20 添加订阅服务器 选择一个数据库作为订阅对象。 图21 选择订阅数据库 配置与订阅服务器的连接。 图22 配置连接 使用一个长期有效的数据库帐号,保障订阅长期有效,这里的帐号设置可以为登录华为云实例的数据库帐号,单击“确定”。 图23 连接分发服务器 创建订阅成功。 图24 创建订阅 将鼠标移动到发布配置上可以看到对应的订阅信息。 图25 查看订阅
-
发布 创建发布。 展开服务器下的“复制”节点,右键单击“本地发布”,选择“新建发布”。 图10 新建发布数据库 选择事务发布。 选择以表作为发布对象。 图11 发布表 添加筛选对象,进行个性化的发布。 图12 添加筛选对象 事务发布会先创建一个快照以复制表当前的状态。也可以设置快照代理用以执行计划。 图13 快照代理 设置代理安全性,这里需要设置登录帐号为本地sa帐号。 图14 设置代理安全性 设置发布名称,单击“完成”。 图15 完成 创建完成后可以通过复制监视器来查看是否创建发布成功。 图16 启动复制监视器
-
Flink Jar作业提交SQL样例程序(Java) 提交SQL的核心逻辑如下,目前只支持提交CREATE和INSERT语句。完整代码参见com.huawei.bigdata.flink.examples.FlinkSQLExecutor。 public class FlinkSQLExecutor { public static void main(String[] args) throws IOException { System.out.println("-------------------- begin init ----------------------"); final String sqlPath = ParameterTool.fromArgs(args).get("sql", "config/redisSink.sql"); final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings); StatementSet statementSet = tableEnv.createStatementSet(); String sqlStr = FileUtils.readFileToString(FileUtils.getFile(sqlPath), "utf-8"); String[] sqlArr = sqlStr.split(";"); for (String sql : sqlArr) { sql = sql.trim(); if (sql.toLowerCase(Locale.ROOT).startsWith("create")) { System.out.println("----------------------------------------------\nexecuteSql=\n" + sql); tableEnv.executeSql(sql); } else if (sql.toLowerCase(Locale.ROOT).startsWith("insert")) { System.out.println("----------------------------------------------\ninsert=\n" + sql); statementSet.addInsertSql(sql); } } System.out.println("---------------------- begin exec sql --------------------------"); statementSet.execute(); } } 需将当前样例需要的依赖包,即编译之后lib文件下面的jar包复制到客户端的lib文件夹内。 以对接普通模式Kafka提交SQL为例: create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink SELECT * FROM datagen_source; 父主题: Flink Jar作业提交SQL样例程序
-
场景说明 假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Spark应用程序实现如下功能: 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 log2.txt:周日网民停留日志 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
-
数据规划 首先需要把原日志文件放置在HDFS系统里。 本地新建两个文本文件input_data1.txt和input_data2.txt,将log1.txt中的内容复制保存到input_data1.txt,将log2.txt中的内容复制保存到input_data2.txt。 在HDFS客户端路径下建立一个文件夹,“/tmp/input”,并上传input_data1.txt,input_data2.txt到此目录,命令如下: 在Linux系统HDFS客户端使用命令hadoop fs -mkdir /tmp/input(hdfs dfs命令有同样的作用),创建对应目录。 进入到HDFS客户端下的“/tmp/input”目录,在Linux系统HDFS客户端使用命令在Linux系统HDFS客户端使用命令hadoop fs -put input_data1.txt /tmp/input和hadoop fs -put input_data2.txt /tmp/input,上传数据文件。
-
运行任务 进入Spark客户端目录,使用java -cp命令运行代码(类名与文件名等请与实际代码保持一致,此处仅为示例): 运行Java样例代码: java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/hive/*:$SPARK_HOME/conf:/opt/female/SparkThriftServerJavaExample-1.0.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/conf/spark-defaults.conf 运行Scala样例代码: java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/hive/*:$SPARK_HOME/conf:/opt/female/SparkThriftServerExample-1.0.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/conf/spark-defaults.conf 集群开启ZooKeeper的SSL特性后(查看ZooKeeper服务的ssl.enabled参数),请在执行命令中添加-Dzookeeper.client.secure=true -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty两项参数: java -Dzookeeper.client.secure=true -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/hive/*:$SPARK_HOME/conf:/opt/female/SparkThriftServerJavaExample-1.0.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/conf/spark-defaults.conf
-
数据规划 确保以多主实例模式启动了JDBCServer服务,并至少有一个实例可连接客户端。在JDBCServer节点上分别创建“/home/data”文件,内容如下: Miranda,32 Karlie,23 Candice,27 确保其对启动JDBCServer的用户有读写权限。 确保客户端classpath下有“hive-site.xml”文件,且根据实际集群情况配置所需要的参数。JDBCServer相关参数详情,请参见Spark JDBCServer接口介绍。
-
Flink Jar作业提交SQL样例程序(Java) 提交SQL的核心逻辑如下,目前只支持提交CREATE和INSERT语句。完整代码参见com.huawei.bigdata.flink.examples.FlinkSQLExecutor。 public class FlinkSQLExecutor { public static void main(String[] args) throws IOException { System.out.println("-------------------- begin init ----------------------"); final String sqlPath = ParameterTool.fromArgs(args).get("sql", "config/redisSink.sql"); final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings); StatementSet statementSet = tableEnv.createStatementSet(); String sqlStr = FileUtils.readFileToString(FileUtils.getFile(sqlPath), "utf-8"); String[] sqlArr = sqlStr.split(";"); for (String sql : sqlArr) { sql = sql.trim(); if (sql.toLowerCase(Locale.ROOT).startsWith("create")) { System.out.println("----------------------------------------------\nexecuteSql=\n" + sql); tableEnv.executeSql(sql); } else if (sql.toLowerCase(Locale.ROOT).startsWith("insert")) { System.out.println("----------------------------------------------\ninsert=\n" + sql); statementSet.addInsertSql(sql); } } System.out.println("---------------------- begin exec sql --------------------------"); statementSet.execute(); } } 需将当前样例需要的依赖包,即编译之后lib文件下面的jar包复制到客户端的lib文件夹内。 以对接普通模式Kafka提交SQL为例: create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink SELECT * FROM datagen_source; 父主题: Flink Jar作业提交SQL样例程序
更多精彩内容
CDN加速
GaussDB
文字转换成语音
免费的服务器
如何创建网站
域名网站购买
私有云桌面
云主机哪个好
域名怎么备案
手机云电脑
SSL证书申请
云点播服务器
免费OCR是什么
电脑云桌面
域名备案怎么弄
语音转文字
文字图片识别
云桌面是什么
网址安全检测
网站建设搭建
国外CDN加速
SSL免费证书申请
短信批量发送
图片OCR识别
云数据库MySQL
个人域名购买
录音转文字
扫描图片识别文字
OCR图片识别
行驶证识别
虚拟电话号码
电话呼叫中心软件
怎么制作一个网站
Email注册网站
华为VNC
图像文字识别
企业网站制作
个人网站搭建
华为云计算
免费租用云托管
云桌面云服务器
ocr文字识别免费版
HTTPS证书申请
图片文字识别转换
国外域名注册商
使用免费虚拟主机
云电脑主机多少钱
鲲鹏云手机
短信验证码平台
OCR图片文字识别
SSL证书是什么
申请企业邮箱步骤
免费的企业用邮箱
云免流搭建教程
域名价格