云服务器内容精选

  • RDS for SQL Server 2019功能概览 类别 功能 版本 web版 标准版 企业版 单机 单机 主备 集群版 实例管理 购买实例 √ √ √ √ 重启实例 √ √ √ √ 实例续费 √ √ √ √ 变更计费方式 按需实例转包周期 包周期实例转按需 √ √ √ √ 变更实例的CPU和规格 √ √ √ √ 重置密码 √ √ √ √ 单机转主备 × √ × × 磁盘扩容 √ √ √ √ 手动切换主备实例 × × √ √ 删除按需实例 √ √ √ √ 升级引擎版本 × × × × 创建只读实例 × × × √ 修改实例名称 √ √ √ √ 修改备注 √ √ √ √ 创建指定实例字符集 √ √ √ √ 设置可维护时间段 √ √ √ √ 购买相同配置实例 √ √ √ √ 标签 管理标签 √ √ √ √ 任务中心 任务中心 √ √ √ √ 数据迁移 DRS备份迁移 √ √ √ √ 数据库连接 VPC内访问(内网访问) √ √ √ √ 公网访问 √ √ √ √ 修改内网地址 √ √ √ √ 修改数据库端口 √ √ √ √ 备份恢复 全量备份(自动备份) √ √ √ √ 增量备份 √ √ √ √ 下载备份 √ √ √ √ 复制备份 √ √ √ √ 自动备份策略 √ √ √ √ 通过备份文件恢复到RDS for SQL Server √ √ √ √ 恢复实例到指定时间点 √ √ √ √ 手动备份 √ √ √ √ 恢复到新实例 √ √ √ √ 恢复到已有实例 √ √ √ √ 设置跨区域备份策略 √ √ √ √ 监控与告警 资源监控 √ √ √ √ 引擎监控 √ √ √ √ 设置告警规则 √ √ √ √ 参数管理 创建参数模板 √ √ √ √ 比较参数模板 √ √ √ √ 应用参数模板 √ √ √ √ 查看参数模板应用记录 √ √ √ √ 编辑参数模板 √ √ √ √ 导出参数模板 √ √ √ √ 查看参数修改历史 √ √ √ √ 日志管理 查看或下载运行日志 √ √ √ √ 查看或下载慢日志 √ √ √ √ 安全管理 云审计服务 √ √ √ √ 数据库审计(DBSS) √ √ √ √ 服务端加密 √ √ √ √ 数据库安全 √ √ √ √ 修改实例安全组 √ √ √ √ 开启透明数据加密(TDE) × √ √ √ 回收站 设置回收站策略 √ √ √ √ 重建实例 √ √ √ √ 引擎主要功能 账号管理 √ √ √ √ 数据库管理 √ √ √ √ 加域实例创建与访问 √ √ √ √ 分布式事务(MSDTC) √ √ √ √ 集成服务(SSIS) × × × √ 文件流(FileStream) √ × × √ 开启CLR集成功能 √ √ √ √ 创建和配置Agent Job和Dblink √ √ √ √ 存储过程相关 修改自定义数据库名称 √ √ √ √ 变更数据捕获(CDC) × √ √ √ 收缩数据库 √ √ √ √ 数据库复制 √ √ √ √ 更新数据库统计信息 √ √ √ √ 跟踪标记 √ √ √ √ 最佳实践 发布与订阅 √ √ √ √ 对接SSRS报表服务 √ √ √ √ 添加链接服务器 √ √ √ √ 父主题: RDS for SQL Server各版本功能概览
  • RDS for SQL Server 2022功能概览 类别 功能 版本 web版 标准版 企业版 单机 单机 主备 集群版 实例管理 购买实例 √ √ √ √ 重启实例 √ √ √ √ 实例续费 √ √ √ √ 变更计费方式 按需实例转包周期 包周期实例转按需 √ √ √ √ 变更实例的CPU和规格 √ √ √ √ 重置密码 √ √ √ √ 单机转主备 × √ × × 磁盘扩容 √ √ √ √ 手动切换主备实例 × × √ √ 删除按需实例 √ √ √ √ 升级引擎版本 × × × × 创建只读实例 × × × √ 修改实例名称 √ √ √ √ 修改备注 √ √ √ √ 创建指定实例字符集 √ √ √ √ 设置可维护时间段 √ √ √ √ 购买相同配置实例 √ √ √ √ 标签 管理标签 √ √ √ √ 任务中心 任务中心 √ √ √ √ 数据迁移 DRS备份迁移 √ √ √ √ 数据库连接 VPC内访问(内网访问) √ √ √ √ 公网访问 √ √ √ √ 修改内网地址 √ √ √ √ 修改数据库端口 √ √ √ √ 备份恢复 全量备份(自动备份) √ √ √ √ 增量备份 √ √ √ √ 下载备份 √ √ √ √ 复制备份 √ √ √ √ 自动备份策略 √ √ √ √ 通过备份文件恢复到RDS for SQL Server √ √ √ √ 恢复实例到指定时间点 √ √ √ √ 手动备份 √ √ √ √ 恢复到新实例 √ √ √ √ 恢复到已有实例 √ √ √ √ 设置跨区域备份策略 √ √ √ √ 监控与告警 资源监控 √ √ √ √ 引擎监控 √ √ √ √ 设置告警规则 √ √ √ √ 参数管理 创建参数模板 √ √ √ √ 比较参数模板 √ √ √ √ 应用参数模板 √ √ √ √ 查看参数模板应用记录 √ √ √ √ 编辑参数模板 √ √ √ √ 导出参数模板 √ √ √ √ 查看参数修改历史 √ √ √ √ 日志管理 查看或下载运行日志 √ √ √ √ 查看或下载慢日志 √ √ √ √ 安全管理 云审计服务 √ √ √ √ 数据库审计(DBSS) √ √ √ √ 服务端加密 √ √ √ √ 数据库安全 √ √ √ √ 修改实例安全组 √ √ √ √ 开启透明数据加密(TDE) × √ √ √ 回收站 设置回收站策略 √ √ √ √ 重建实例 √ √ √ √ 引擎主要功能 账号管理 √ √ √ √ 数据库管理 √ √ √ √ 加域实例创建与访问 √ √ √ √ 分布式事务(MSDTC) √ √ √ √ 集成服务(SSIS) × × × √ 文件流(FileStream) √ × × √ 开启CLR集成功能 √ √ √ √ 创建和配置Agent Job和Dblink √ √ √ √ 存储过程相关 修改自定义数据库名称 √ √ √ √ 变更数据捕获(CDC) × √ √ √ 收缩数据库 √ √ √ √ 数据库复制 √ √ √ √ 更新数据库统计信息 √ √ √ √ 跟踪标记 √ √ √ √ 最佳实践 发布与订阅 √ √ √ √ 对接SSRS报表服务 √ √ √ √ 添加链接服务器 √ √ √ √ 父主题: RDS for SQL Server各版本功能概览
  • 样例代码 from kazoo.client import KazooClient zk = KazooClient(hosts='ZookeeperHost') zk.start() result=zk.get("/thriftserver/active_thriftserver") result=result[0].decode('utf-8') JDBCServerHost=result[0].split(":")[0] JDBCServerPort=result[0].split(":")[1] from pyhive import hive conn = hive.Connection(host=JDBCServerHost, port=JDBCServerPort,database='default') cursor=conn.cursor() cursor.execute("select * from test") for result in cursor.fetchall(): print result 其中,ZookeeperHost使用4获取到的zookeeper IP和PORT替换。
  • 环境准备 安装支持环境。(开发环境请参考Spark应用开发环境简介准备) 执行以下命令安装编译工具: yum install cyrus-sasl-devel -y yum install gcc-c++ -y 安装相应的python模块。 需要安装sasl,thrift,thrift-sasl,PyHive。 pip install sasl pip install thrift pip install thrift-sasl pip install PyHive 安装python连接zookeeper工具。 pip install kazoo 从MRS集群上获取相应参数。 zookeeper的IP和PORT: 可以查看配置文件/opt/client/Spark/spark/conf/hive-site.xml中的配置项spark.deploy.zookeeper.url zookeeper 上存放JDBCServer主节点的IP和PORT: 可以查看配置文件/opt/client/Spark/spark/conf/hive-site.xml中的配置项spark.thriftserver.zookeeper.dir(默认是/thriftserver),在此znode子节点(active_thriftserver)上存放了JDBCServer主节点的IP和PORT
  • 示例 从Kafka源表中读取数据,将GaussDB(DWS)表作为维表,并将二者生成的宽表信息写入print结果表中,其具体步骤如下: 连接GaussDB(DWS)数据库实例,在GaussDB(DWS)中创建相应的表,作为维表,表名为area_info,SQL语句如下: 1 2 3 4 5 6 7 8 9 create table public.area_info( area_id VARCHAR, area_province_name VARCHAR, area_city_name VARCHAR, area_county_name VARCHAR, area_street_name VARCHAR, region_name VARCHAR, PRIMARY KEY(area_id) ); 连接GaussDB(DWS)数据库实例,向GaussDB(DWS)维表area_info中插入测试数据,其语句如下: 1 2 3 4 5 6 7 insert into area_info (area_id, area_province_name, area_city_name, area_county_name, area_street_name, region_name) values ('330102', 'a1', 'b1', 'c1', 'd1', 'e1'), ('330106', 'a1', 'b1', 'c2', 'd2', 'e1'), ('330108', 'a1', 'b1', 'c3', 'd3', 'e1'), ('330110', 'a1', 'b1', 'c4', 'd4', 'e1'); flink sql创建源表、结果表、维表并执行SQL: CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'order_test', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'dws-order', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); --创建地址维表 create table area_info ( area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) WITH ( 'connector' = 'dws', 'url' = 'jdbc:gaussdb://DwsAddress:DwsPort/DwsDbName', 'tableName' = 'area_info', 'username' = 'DwsUserName', 'password' = 'DwsPassword', 'lookupCacheMaxRows' = '10000', 'lookupCacheExpireAfterAccess' = '2h' ); --根据地址维表生成详细的包含地址的订单信息宽表 create table order_detail( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, area_province_name string, area_city_name string, area_county_name string, area_street_name string, region_name string ) with ( 'connector' = 'print' ); insert into order_detail select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name, area.area_id, area.area_province_name, area.area_city_name, area.area_county_name, area.area_street_name, area.region_name from orders left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id; 在Kafka中写入数据: 1 2 3 {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"} {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"} 结果参考如下:
  • 语法格式 1 2 3 4 5 6 7 8 9 10 11 create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* ) with ( 'connector' = 'dws', 'url' = '', 'tableName' = '', 'username' = '', 'password' = '' );
  • 参数说明 表1 数据库配置 参数 说明 默认值 connector flink框架区分Connector参数,固定为dws。 - url 数据库连接地址。 - username 配置连接用户。 - password 数据库用户密码。 - 表2 连接配置参数 参数 名称 类型 说明 默认值 connectionSize 读取线程池大小 int 用于执行操作的线程数量 = 数据库连接数量,同写入线程大小。 1 readBatchSize 最多一次将get请求合并提交的数量 int 当查询请求积压后,最大的批量查询数量。 128 readBatchQueueSize get请求缓冲池大小 int 查询请求最大积压容量。 256 readTimeoutMs get操作的超时时间(毫秒/ms) int 默认值0表示不超时,会在两处位置生效: get操作从用户开始执行到client准备提交到dws的等待时间。 get sql的执行超时,即statement query timeout。 0 readSyncThreadEnable 非异步查询时,是否开启线程池 boolean 开启后future.get()异步阻塞,关闭后主线程同步调用阻塞。 true lookupScanEnable 是否开启scan查询 boolean 关联条件在非全主键匹配下,是否开启scan查询。 若为false,则join关联条件必须全为主键,否则将抛异常。 false fetchSize / lookupScanFetchSize scan一次查询大小 int 非全主键匹配下,一次条件查询的返回数量限制(默认fetchSize生效,当fetchSize为0时,lookupScanFetchSize生效)。 1000 lookupScanTimeoutMs scan操作的超时时间(毫秒/ms) int 非全主键匹配下,一次条件查询的超时限制(ms)。 60000 lookupAsync 是否采用异步方式获取数据 boolean 查询方式设置为同步or异步。 true lookupCacheType 缓存策略 LookupCacheType 设置以下缓存策略(不区分大小写): None:无缓存LRU(默认值):缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果未找到,则去物理维表中查找。 ALL:全量数据缓存,适合不常更新小表。 LookupCacheType.LRU lookupCacheMaxRows 缓存大小 long 当选择LRU缓存策略后,可以设置缓存大小。 1000 lookupCacheExpireAfterAccess 读取后开始计算的超时时间 Duration 当选择LRU缓存策略后,可以设置每次读取后,超时时间顺延长,默认不生效。 null lookupCacheExpireAfterWrite 写入后开始计算的超时时间 Duration 当选择LRU缓存策略后,可以设置每次写入后,超时时间固定,不论访问与否。 10s lookupCacheMissingKey 数据不存在后写入缓存 boolean 当选择LRU缓存策略后,维表数据不存在,同时将数据缓存。 false lookupCacheReloadStrategy 全量缓存重载策略 ReloadStrategy 当选择ALL缓存策略后,可以设置以下数据重载策略: PERIODIC:周期性数据重载。 TIMED:定时数据重载,以天为单位。 ReloadStrategy.PERIODIC lookupCachePeriodicReloadInterval 数据重载时间间隔 Duration 当选择PERIOD重载策略时,可以设置全量缓存重载间隔。 1h lookupCachePeriodicReloadMode 数据重载模式 ScheduleMode 当选择PERIOD重载策略时,可以设置以下重载模式(不区分大小写): FIXED_DELAY:从上一个加载结束计算重新加载间隔。 FIXED_RATE:从上一个加载开始计算重新加载间隔。 ScheduleMode.FIXED_DELAY lookupCacheTimedReloadTime 数据重载定时调度时间 string 当选择TIMED重载策略时,可以设置全量缓存重载时间,以ISO-8601格式表示。例如:“10:15”。 00:00 lookupCacheTimedReloadIntervalDays 数据重载定时周期调度间隔天数 int 当选择TIMED重载策略时,可以设置全量缓存周期调度间隔天数。 1
  • 订阅 选择对应设置的发布,单击右键选择“新建订阅”。 图17 新建订阅 创建订阅的发布,单击“下一步”。 图18 创建发布 选择推送订阅,单击“下一步”。 图19 分发代理位置 选择“添加订阅服务器”,支持SQL Server引擎和非SQL Server作为订阅服务器,将创建并根据上述步骤配置的华为云实例作为一个订阅服务器。 图20 添加订阅服务器 选择一个数据库作为订阅对象。 图21 选择订阅数据库 配置与订阅服务器的连接。 图22 配置连接 使用一个长期有效的数据库帐号,保障订阅长期有效,这里的帐号设置可以为登录华为云实例的数据库帐号,单击“确定”。 图23 连接分发服务器 创建订阅成功。 图24 创建订阅 将鼠标移动到发布配置上可以看到对应的订阅信息。 图25 查看订阅
  • 发布 创建发布。 展开服务器下的“复制”节点,右键单击“本地发布”,选择“新建发布”。 图10 新建发布数据库 选择事务发布。 选择以表作为发布对象。 图11 发布表 添加筛选对象,进行个性化的发布。 图12 添加筛选对象 事务发布会先创建一个快照以复制表当前的状态。也可以设置快照代理用以执行计划。 图13 快照代理 设置代理安全性,这里需要设置登录帐号为本地sa帐号。 图14 设置代理安全性 设置发布名称,单击“完成”。 图15 完成 创建完成后可以通过复制监视器来查看是否创建发布成功。 图16 启动复制监视器
  • Flink Jar作业提交SQL样例程序(Java) 提交SQL的核心逻辑如下,目前只支持提交CREATE和INSERT语句。完整代码参见com.huawei.bigdata.flink.examples.FlinkSQLExecutor。 public class FlinkSQLExecutor { public static void main(String[] args) throws IOException { System.out.println("-------------------- begin init ----------------------"); final String sqlPath = ParameterTool.fromArgs(args).get("sql", "config/redisSink.sql"); final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings); StatementSet statementSet = tableEnv.createStatementSet(); String sqlStr = FileUtils.readFileToString(FileUtils.getFile(sqlPath), "utf-8"); String[] sqlArr = sqlStr.split(";"); for (String sql : sqlArr) { sql = sql.trim(); if (sql.toLowerCase(Locale.ROOT).startsWith("create")) { System.out.println("----------------------------------------------\nexecuteSql=\n" + sql); tableEnv.executeSql(sql); } else if (sql.toLowerCase(Locale.ROOT).startsWith("insert")) { System.out.println("----------------------------------------------\ninsert=\n" + sql); statementSet.addInsertSql(sql); } } System.out.println("---------------------- begin exec sql --------------------------"); statementSet.execute(); } } 需将当前样例需要的依赖包,即编译之后lib文件下面的jar包复制到客户端的lib文件夹内。 以对接普通模式Kafka提交SQL为例: create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink SELECT * FROM datagen_source; 父主题: Flink Jar作业提交SQL样例程序
  • 场景说明 假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发Spark应用程序实现如下功能: 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。 log1.txt:周六网民停留日志 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60 log2.txt:周日网民停留日志 LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
  • 数据规划 首先需要把原日志文件放置在HDFS系统里。 本地新建两个文本文件input_data1.txt和input_data2.txt,将log1.txt中的内容复制保存到input_data1.txt,将log2.txt中的内容复制保存到input_data2.txt。 在HDFS客户端路径下建立一个文件夹,“/tmp/input”,并上传input_data1.txt,input_data2.txt到此目录,命令如下: 在Linux系统HDFS客户端使用命令hadoop fs -mkdir /tmp/input(hdfs dfs命令有同样的作用),创建对应目录。 进入到HDFS客户端下的“/tmp/input”目录,在Linux系统HDFS客户端使用命令在Linux系统HDFS客户端使用命令hadoop fs -put input_data1.txt /tmp/input和hadoop fs -put input_data2.txt /tmp/input,上传数据文件。
  • 运行任务 进入Spark客户端目录,使用java -cp命令运行代码(类名与文件名等请与实际代码保持一致,此处仅为示例): 运行Java样例代码: java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/hive/*:$SPARK_HOME/conf:/opt/female/SparkThriftServerJavaExample-1.0.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/conf/spark-defaults.conf 运行Scala样例代码: java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/hive/*:$SPARK_HOME/conf:/opt/female/SparkThriftServerExample-1.0.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/conf/spark-defaults.conf 集群开启ZooKeeper的SSL特性后(查看ZooKeeper服务的ssl.enabled参数),请在执行命令中添加-Dzookeeper.client.secure=true -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty两项参数: java -Dzookeeper.client.secure=true -Dzookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/hive/*:$SPARK_HOME/conf:/opt/female/SparkThriftServerJavaExample-1.0.jar com.huawei.bigdata.spark.examples.ThriftServerQueriesTest $SPARK_HOME/conf/hive-site.xml $SPARK_HOME/conf/spark-defaults.conf
  • 数据规划 确保以多主实例模式启动了JDBCServer服务,并至少有一个实例可连接客户端。在JDBCServer节点上分别创建“/home/data”文件,内容如下: Miranda,32 Karlie,23 Candice,27 确保其对启动JDBCServer的用户有读写权限。 确保客户端classpath下有“hive-site.xml”文件,且根据实际集群情况配置所需要的参数。JDBCServer相关参数详情,请参见Spark JDBCServer接口介绍。
  • Flink Jar作业提交SQL样例程序(Java) 提交SQL的核心逻辑如下,目前只支持提交CREATE和INSERT语句。完整代码参见com.huawei.bigdata.flink.examples.FlinkSQLExecutor。 public class FlinkSQLExecutor { public static void main(String[] args) throws IOException { System.out.println("-------------------- begin init ----------------------"); final String sqlPath = ParameterTool.fromArgs(args).get("sql", "config/redisSink.sql"); final StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, bsSettings); StatementSet statementSet = tableEnv.createStatementSet(); String sqlStr = FileUtils.readFileToString(FileUtils.getFile(sqlPath), "utf-8"); String[] sqlArr = sqlStr.split(";"); for (String sql : sqlArr) { sql = sql.trim(); if (sql.toLowerCase(Locale.ROOT).startsWith("create")) { System.out.println("----------------------------------------------\nexecuteSql=\n" + sql); tableEnv.executeSql(sql); } else if (sql.toLowerCase(Locale.ROOT).startsWith("insert")) { System.out.println("----------------------------------------------\ninsert=\n" + sql); statementSet.addInsertSql(sql); } } System.out.println("---------------------- begin exec sql --------------------------"); statementSet.execute(); } } 需将当前样例需要的依赖包,即编译之后lib文件下面的jar包复制到客户端的lib文件夹内。 以对接普通模式Kafka提交SQL为例: create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink SELECT * FROM datagen_source; 父主题: Flink Jar作业提交SQL样例程序