华为云用户手册

  • 内容导航 SDK开发指南指导您如何安装和配置开发环境、如何通过调用DIS SDK提供的接口函数进行二次开发。 章节 内容 DIS SDK能做什么 内容导航 简要介绍DIS的概念和DIS SDK的概念。 SDK下载 兼容性 如何校验软件包完整性? 介绍使用DIS SDK进行二次开发过程中涉及到的资源信息。 开通DIS服务 介绍DIS服务和DIS通道的开通方式。 获取认证信息 介绍使用DIS SDK进行二次开发前需要进行的初始化工作。 Python:准备环境~~获取数据游标 介绍使用DIS SDK进行的常用操作(匹配python)。 Java:准备环境~~变更分区数量 介绍使用DIS SDK进行的常用操作(匹配java)。 DIS服务端错误码 介绍使用DIS SDK过程中遇到异常时的响应信息。 父主题: 简介
  • 查询APP列表 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 listApp_test 方法中的(limit可设置单次请求返回APP列表的最大数量取值范围:1~100)。 配置参数如下: 1 startAppName="app1" #APP名称(从该通道开始返回app列表,返回的app列表不包括此app名称。) 配置好以上参数,执行listApp_sample.py文件调用Applist_test方法。 响应结果如下: 12 200{'has_more_app': False, 'apps': [{'app_id': 'kpvGNrFYfKjpqTSdPIX', 'create_time': 1543301301992, 'app_name': 'sadfghjkl'}, {'app_id': 'MtPG1lD1E7IesDuOcNt', 'create_time': 1542765418080, 'app_name': 'testAppName2'}]} 父主题: 使用SDK(Python)
  • 查询Checkpoint 参考初始化DIS客户端的操作初始化一个DIS客户端实例,实例名称为dic。 其中,“streamName”的配置值要与开通DIS通道中“通道名称”的值一致,“endpoint”,“ak”,“sk”,“region”,“projectId”信息请参见获取认证信息。 123456789 GetCheckpointRequest getCheckpointRequest = new GetCheckpointRequest();// 设置App名称(注: 1.3.0及以前版本使用setAppId,1.3.1及后续版本请使用setAppNamegetCheckpointRequest.setAppName(appName);getCheckpointRequest.setStreamName(streamName);// 分区编号getCheckpointRequest.setPartitionId("0");// Checkpoint类型getCheckpointRequest.setCheckpointType(CheckpointTypeEnum.LAST_READ.name());System.out.println("getCheckpoint: " + JsonUtils.objToJson(dic.getCheckpoint(getCheckpointRequest))); 查询Checkpoint的返回信息如下。 12 getCheckpoint:{"sequence_number": "10", "metadata": "metadata"} 父主题: 使用SDK(Java)
  • 参数说明 表2 参数说明 参数名 参数类型 说明 partitionId String 分区ID。 说明: 请根据上传流式数据的执行结果,控制台的返回信息字段,例如 “partitionId [shardId-0000000000]”进行定义。 startingSequenceNumber String 序列号。序列号是每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecords操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。 说明: 请根据上传流式数据的执行结果,控制台的返回信息字段,例如“sequenceNumber [1]”进行定义。 cursorType String 游标类型。 AT_SEQUENCE_NUMBER:从特定序列号(即startingSequenceNumber定义的序列号)所在的记录开始读取数据。此类型为默认游标类型。 AFTER_SEQUENCE_NUMBER:从特定序列号(即startingSequenceNumber定义的序列号)后的记录开始读取数据。 TRIM_HORIZON:从最早被存储至分区的有效记录开始读取。 例如,某租户使用DIS的通道,分别上传了三条数据A1,A2,A3。N天后(设定A1已过期,A2和A3仍在有效期范围内),该租户需要下载数据,并选择了TRIM_HORIZON这种下载方式。那么用户可下载的数据将从A2开始读取。 LATEST:从分区中的最新记录开始读取,此设置可以保证你总是读到分区中最新记录。 AT_TIMESTAMP:从特定时间戳(即timestamp定义的时间戳)开始读取。
  • 背景信息 下载流式数据,需要确定从分区的什么位置开始获取(即获取游标)。确定起始位置后,再循环获取数据。 获取游标有如下五种方式: AT_SEQUENCE_NUMBER AFTER_SEQUENCE_NUMBER TRIM_HORIZON LATEST AT_TIMESTAMP 为更好理解游标类型,您需要了解如下几个基本概念。 序列号(sequenceNumber),每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecord操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。 每个分区的sequenceNumber从0开始持续增长,每条数据对应唯一的sequenceNumber,超过生命周期后此sequenceNumber将过期不可用。(例如上传一条数据到新分区,其sequenceNumber起始为0,上传100条之后,则最后一条的sequenceNumber为99;如超过生命周期之后,0~99的数据则不可用) 分区的数据有效范围可以通过调用describeStream(查询通道详情)接口获取,其sequenceNumberRange代表数据有效范围,第一个值为最老数据的sequenceNumber,最后一个值为下一条上传数据的sequenceNumber(最新数据的sequenceNumber为此值-1) 例如[100, 200],表示此分区总共上传了200条数据,其中第0~99条已过期,有效的最老数据为100,最新数据为199,下一条上传数据的sequenceNumber为200。
  • 操作步骤 使用“WinSCP”工具将“huaweicloud-sdk-dis-x.x.x.zip”上传至Linux系统任一目录。 x.x.x表示DIS SDK包的版本号。 使用“PuTTY”工具登录Linux系统,进入到“huaweicloud-sdk-dis-x.x.x.zip”所在目录,执行如下命令,获取DIS SDK压缩包的校验码。 sha256sum huaweicloud-sdk-dis-x.x.x.zip 显示类似如下校验码: # sha256sum dis-sdk-x.x.x.zip8be2c937e8d78b1a9b99777cee4e7131f8bf231de3f839cf214e7c5b5ba3c088 huaweicloud-sdk-dis-x.x.x.zip 打开DIS SDK的校验文件“huaweicloud-sdk-dis-x.x.x.zip.sha256sum”与上一步骤中获取的校验码进行对比。 一致,说明从获取的DIS SDK压缩包没被篡改。 不一致,说明DIS SDK压缩包被篡改,需要重新获取。
  • 查询通道详情 参考初始化DIS客户端的操作初始化一个DIS客户端实例,实例名称为dic。 使用DIS SDK查询指定通道信息。 1234 String streamName = "myStream";DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();describeStreamRequest.setStreamName(streamName);System.out.println("descStream: " + JsonUtils.objToJson(dic.describeStream(describeStreamRequest))); 查询通道的返回信息如下。 1 descStream: DescribeStreamResult [streamId=JnYbsMfWNn81e8n2mOC, streamName=myStream, createTime=1540977519187, lastModifiedTime=1540977519983, retentionPeriod=24, status=RUNNING, streamType=ADVANCED, dataType=BLOB, writablePartitionCount=2, readablePartitionCount=2, partitions=[PartitionResult{partitionId='shardId-0000000000', hashRange='[0 : 4611686018427387902]', status='ACTIVE', parentPartitionIds='null', sequenceNumberRange='[0 : 13286444]'}, PartitionResult{partitionId='shardId-0000000001', hashRange='[4611686018427387903 : 9223372036854775807]', status='ACTIVE', parentPartitionIds='null', sequenceNumberRange='[0 : 13288589]'}], hasMorePartitions=false, updatePartitionCounts=null] 父主题: 使用SDK(Java)
  • 获取数据游标 参考初始化DIS客户端的操作初始化一个DIS客户端实例,实例名称为dic。 使用DIS SDK获取数据游标信息。 1 2 3 4 5 6 7 8 910111213141516 // 配置通道名称String streamName = "myStream";// 配置数据下载分区IDString partitionId = "0";// 配置下载数据序列号String startingSequenceNumber = "0";// 配置下载数据方式String cursorType = PartitionCursorTypeEnum.AT_SEQUENCE_NUMBER.name();GetPartitionCursorRequest request = new GetPartitionCursorRequest();request.setStreamName(streamName);request.setPartitionId(partitionId);request.setStartingSequenceNumber(startingSequenceNumber);request.setCursorType(cursorType);GetPartitionCursorResult response = dic.getPartitionCursor(request);String cursor = response.getPartitionCursor(); 父主题: 使用SDK(Java)
  • 获取数据游标 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: partitionId="shardId-0000000000" streamname=”dis-test1“ #已存在的通道名5种游标设置使用参考如下:# startSeq与AT_SEQUENCE_NUMBER/AFTER_SEQUENCE_NUMBER搭配使用r = cli.getCursor(streamname, partitionId, cursorType='AT_SEQUENCE_NUMBER', startSeq="0")# r = cli.getCursor(streamname, partitionId, cursorType='AFTER_SEQUENCE_NUMBER', startSeq="0")# timestamp与AT_TIMESTAMP搭配使用# r = cli.getCursor(streamname, partitionId, cursorType='AT_TIMESTAMP',timestamp=1554694135190)# r = cli.getCursor(streamname, partitionId, cursorType='TRIM_HORIZON')# r = cli.getCursor(streamname, partitionId, cursorType='LATEST') 配置好以上参数,执行getCursor_sample.py文件调用getCursor_test方法,响应结果示例如下: 12 200{"partition_cursor": "eyJnZXRJdGVyYXRvclBhcmFtIjp7InN0cmVhbS1uYW1lIjoiSCIsInBhcnRpdGlvbi1pZCI6InNoYXJkSWQtMDAwMDAwMDAwMCIsImN1cnNvci10eXBlIjoiQVRfU0VRVUVOQ0VfTlVNQkVSIiwic3RhcnRpbmctc2VxdWVuY2UtbnVtYmVyIjoiMCJ9LCJnZW5lcmF0ZVRpbWVzdGFtcCI6MTUzMjQyNDg4NzE1NH0"} 父主题: 使用SDK(Python)
  • 新增Checkpoint 参考初始化DIS客户端的操作初始化一个DIS客户端实例,实例名称为dic。 使用DIS SDK创建Checkpoint,需要指定通道名称、APP名称、分区编号、序列号以及Checkpoint类型。 其中,“streamName”的配置值要与开通DIS通道中“Stream Name”的值一致。 1 2 3 4 5 6 7 8 910111213 // 通道名称 String streamName = "myStream"; // APP名称String appName = "myApp";CommitCheckpointRequest commitCheckpointRequest = new CommitCheckpointRequest();commitCheckpointRequest.setStreamName(streamName);commitCheckpointRequest.setAppName(appName);// 需要提交的sequenceNumbercommitCheckpointRequest.setSequenceNumber("100");// 分区编号commitCheckpointRequest.setPartitionId("0");// Checkpoint类型commitCheckpointRequest.setCheckpointType(CheckpointTypeEnum.LAST_READ.name()); 配置“CommitCheckpointRequest”对象之后,通过调用 commitCheckpoint 提交checkpoint。 dic.commitCheckpoint(commitCheckpointRequest); 父主题: 使用SDK(Java)
  • 变更分区数量 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 12 streamname = "" #已存在的running状态通道名target_partition_count =”3” #变更后的数量值 配置好以上参数,执行changepartitionQuantity_sample.py文件调用changepartitionQuantity_test方法,响应结果如下: 12345 { "stream_name":"stream_name_test", "current_partition_count":2 "target_partition_count":5 } 父主题: 使用SDK(Python)
  • 查询APP详情 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 appname=”app1” #查询的APP名称 配置好以上参数,执行describeApp_sample.py文件调用describeApp_test方法。 响应结果如下: 12 200{'app_name': 'app1', 'app_id': 'OPKQuggQVtfqhyvK0cs', 'create_time': 1532425956631} 父主题: 使用SDK(Python)
  • 新增Checkpoint 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 12345 streamname = "" #通道名称appName="xx" # APP名称(APP是已存在状态)partitionId="shardId-0000000000" #分区的唯一标识符。seqNumber="0" #序列号metadata="" #用户消费程序端的元数据信息,元数据信息的最大长度为1000个字符 partitionId可通过查询通道详情获取,需要先传入当前设置的通道名称。 配置好以上参数,执行commitCheckpoint_sample.py文件调用commitCheckpoint_test方法,响应201表示成功。 父主题: 使用SDK(Python)
  • 初始化DIS客户端 您可以使用以下两种方法初始化DIS SDK客户端实例,优先选择使用代码进行初始化。其中,“endpoint”,“ak”,“sk”,“region”,“projectId”信息请参考获取认证信息。 使用代码初始化DIS SDK客户端实例。 1 2 3 4 5 6 7 8 910111213 // 创建DIS客户端实例DIS dic = DISClientBuilder.standard() .withEndpoint("YOUR_ENDPOINT") // 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全; // 本示例以ak和sk保存在环境变量中来实现身份验证为例,运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK。 .withAk(System.getenv("HUAWEICLOUD_SDK_AK")) .withSk(System.getenv("HUAWEICLOUD_SDK_SK")) .withProjectId("YOUR_PROJECT_ID") .withRegion("YOUR_REGION") // 以下配置失败时的重试次数 .withProperty(DISConfig.PROPERTY_PRODUCER_RECORDS_RETRIES, "-1") .withProperty(DISConfig.PROPERTY_PRODUCER_EXCEPTION_RETRIES, "-1") .build(); 若需要使用代理,请使用如下方法初始化DIS客户端: 1 2 3 4 5 6 7 8 9101112131415161718 // 创建DIS客户端实例 DIS dic = DISClientBuilder.standard() .withEndpoint("YOUR_ENDPOINT") // 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全; // 本示例以ak和sk保存在环境变量中来实现身份验证为例,运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK。 .withAk(System.getenv("HUAWEICLOUD_SDK_AK")) .withSk(System.getenv("HUAWEICLOUD_SDK_SK")) .withProjectId("YOUR_PROJECT_ID") .withRegion("YOUR_REGION") .withProxyHost("YOUR_PROXY_HOST") // 代理IP .withProxyPort("YOUR_PROXY_PORT") // 代理端口 .withProxyProtocol(Protocol.HTTP) // 代理协议,默认为HTTP .withProxyUsername("YOUR_PROXY_USER_NAME") // 代理用户名(可选) .withProxyPassword("YOUR_PROXY_PASSWORD") // 代理密码(可选) // 以下配置失败时的重试次数 .withProperty(DISConfig.PROPERTY_PRODUCER_RECORDS_RETRIES, "-1") .withProperty(DISConfig.PROPERTY_PRODUCER_EXCEPTION_RETRIES, "-1") .build(); 若需在客户端设置DIS连接超时时间,请使用如下方法初始化DIS客户端: 123456789 // 创建DIS客户端实例 DIS dic = DISClientBuilder.standard() .withEndpoint("YOUR_ENDPOINT") .withAk(System.getenv("HUAWEICLOUD_SDK_AK")) .withSk(System.getenv("HUAWEICLOUD_SDK_SK")) .withProjectId("YOUR_PROJECT_ID") .withRegion("YOUR_REGION") .withProperty(DISConfig.PROPERTY_CONNECTION_TIMEOUT, "60") // 单位:秒 .build(); 若需要开启传输压缩,请使用如下方法初始化DIS客户端: 1 2 3 4 5 6 7 8 910111213 // 创建DIS客户端实例DIS dic = DISClientBuilder.standard() .withEndpoint("YOUR_ENDPOINT") .withAk(System.getenv("HUAWEICLOUD_SDK_AK")) .withSk(System.getenv("HUAWEICLOUD_SDK_SK")) .withProjectId("YOUR_PROJECT_ID") .withRegion("YOUR_REGION") .withBodyCompressEnabled(true) .withBodyCompressType(CompressionType.ZSTD) // 配置压缩算法,当前支持lz4和zstd,默认值为lz4 // 以下配置失败时的重试次数 .withProperty(DISConfig.PROPERTY_PRODUCER_RECORDS_RETRIES, "-1") .withProperty(DISConfig.PROPERTY_PRODUCER_EXCEPTION_RETRIES, "-1") .build(); 若需在客户端将 数据加密 后再上传到DIS,DIS SDK提供了加密方法。即在构建disclient时增加参数DataEncryptEnabled和data.password。 1 2 3 4 5 6 7 8 910111213 // 创建DIS客户端实例DIS dic = DISClientBuilder.standard() .withEndpoint("YOUR_ENDPOINT") .withAk(System.getenv("HUAWEICLOUD_SDK_AK")) .withSk(System.getenv("HUAWEICLOUD_SDK_SK")) .withProjectId("YOUR_PROJECT_ID") .withRegion("YOUR_REGION") .withDataEncryptEnabled(true) .withProperty("data.password", "xxx") //xxx替换为用户配置的数据加密密钥 // 以下配置失败时的重试次数 .withProperty(DISConfig.PROPERTY_PRODUCER_RECORDS_RETRIES, "-1") .withProperty(DISConfig.PROPERTY_PRODUCER_EXCEPTION_RETRIES, "-1") .build(); 若使用JAVA SDK加密上传数据,读取数据也需要使用JAVA SDK配置相同的密钥。 使用配置文件初始化DIS SDK客户端实例。 在“dis-sdk-demo\src\main\resources”目录下的“dis.properties”文件中添加如下配置项。 ak/sk:用户在 IAM 中创建的AK/SK。 region:用户使用通道所在的区域。 endpoint:DIS的访问地址。 projectId:通道所在的资源ID。 12 // 创建DIS客户端实例 DIS dic = DISClientBuilder.standard().build(); 父主题: 使用SDK(Java)
  • 查询通道详情 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 streamname="dis-test1" #已存在的通道名 配置好以上参数后,执行describeStream_sample.py文件默认调用describeStream_test方法。 返回信息如下: 12 200{"status": "RUNNING", "stream_name": "dis-test1", "data_type": "BLOB", "has_more_partitions": false, "stream_type": "COMMON", "stream_id": "L84hxfES223eVrFyxiE", "retention_period": 168, "create_time": 1532423353637, "last_modified_time": 1532423354625, "partitions": [{"status": "ACTIVE", "hash_range": "[0 : 9223372036854775807]", "sequence_number_range": "[0 : 10]", "partition_id": "shardId-0000000000"}]} 父主题: 使用SDK(Python)
  • 查询通道列表 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 1 start_stream_name = "" #可设置为空,或是已存在的通道名 执行listStream_sample.py文件默认调用listStream_test方法,获取响应200查询成功。 通道列表的返回信息示例如下: 12 200{'stream_names': ['dis-jLGp', 'dis-w_p', 'dis_test1', 'dis_test2'], 'has_more_streams': False, 'total_number': 4} 父主题: 使用SDK(Python)
  • 查询Checkpoint 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 123 streamname = "" #通道名称appName="xx" # APP名称(APP是已存在状态)partitionId="shardId-0000000000" #分区的唯一标识符 partitionId可通过查询通道详情获取,需要先传入当前设置的通道名称。 配置好以上参数,执行getCheckpoint_sample.py文件调用getCheckpoint_test方法,响应结果如下: 1234 { "sequence_number": "10", "metadata": "metadata" } 父主题: 使用SDK(Python)
  • 删除通道 参考初始化DIS客户端的操作初始化一个DIS客户端实例,实例名称为dic。 使用DIS SDK删除指定的DIS通道。 1234 //待删除的通道名称String streamName = "myStream";DeleteStreamRequest deleteStreamRequest = new DeleteStreamRequest();deleteStreamRequest.setStreamName(streamName); 配置“DeleteStreamRequest”对象之后,通过对客户端调用deleteStream的方法删除通道。 1 dic.deleteStream(deleteStreamRequest); 父主题: 使用SDK(Java)
  • 删除转储任务 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 12 streamname = "" #已存在的通道名称task_name="xx" task_name配置为特定的转储任务名称,则删除通道下的该转储任务。 配置好以上参数后,执行delete_dump_task_sample.py文件默认调用delete_dump_task_test方法,获取响应204删除成功。 父主题: 使用SDK(Python)
  • 添加转储到 MapReduce服务 MRS )的转储任务 1 2 3 4 5 6 7 8 9101112131415161718192021222324252627282930 CreateTransferTaskRequest request = new CreateTransferTaskRequest();//配置通道名称:用户在 数据接入服务 (简称DIS)控制台创建通道request.setStreamName(streamName);//添加MRS转储任务,并设置任务名称MRSDestinationDescriptorRequest descriptor = new MRSDestinationDescriptorRequest();descriptor.setTransferTaskName(taskName);// 配置MRS集群信息:集群名称和集群ID。可通过弹性大数据服务(简称MRS)控制台创建和查询,集群需为非安全模式descriptor.setMrsClusterName("mrs_dis");descriptor.setMrsClusterId("fe69a732-c7d3-4b0f-8cda-ec9eca0cf141");// 转储MRS通过OBS服务中转,需配置OBS桶名和子文件夹名,此目录也用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹descriptor.setObsBucketPath("obs-dis");descriptor.setFilePrefix("transfertask");// 转储周期,单位sdescriptor.setDeliverTimeInterval(900);// 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。descriptor.setAgencyName("dis_admin_agency");// 转储OBS的目标文件格式:默认text,可配置parquet、carbondescriptor.setDestinationFileType(DestinationFileTypeEnum.TEXT.getType());// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());request.setMrsDestinationDescriptor(descriptor);
  • 添加转储到 数据湖探索 服务( DLI )的转储任务 1 2 3 4 5 6 7 8 9101112131415161718192021222324252627 CreateTransferTaskRequest request = new CreateTransferTaskRequest();// 配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道request.setStreamName(streamName);// 添加DLI转储任务,并设置任务名称UqueryDestinationDescriptorRequest descriptor = new UqueryDestinationDescriptorRequest();descriptor.setTransferTaskName(taskName);// 配置DLI相关信息:数据库和内表名称。可通过 数据湖 探索(简称DLI)控制台创建和查询,DLI表需为内表descriptor.setDliDatabaseName("dis_dli");descriptor.setDliTableName("dis_test");// 转储DLI通过OBS服务中转,需配置OBS桶名和子文件夹名,此目录也用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹descriptor.setObsBucketPath("obs-dis");descriptor.setFilePrefix("transfertask");// 转储周期,单位sdescriptor.setDeliverTimeInterval(900);// 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。descriptor.setAgencyName("dis_admin_agency");// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());request.setDliDestinationDescriptor(descriptor);
  • 添加转储到 数据仓库 服务(DWS)的转储任务 1 2 3 4 5 6 7 8 910111213141516171819202122232425262728293031323334353637 CreateTransferTaskRequest request = new CreateTransferTaskRequest();//配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道request.setStreamName(streamName);//添加DWS转储任务,并设置任务名称DwsDestinationDescriptorRequest descriptor = new DwsDestinationDescriptorRequest();descriptor.setTransferTaskName(taskName);// 配置DWS集群信息:集群名称、集群ID、数据库等信息。可通过数据仓库服务(简称DWS)控制台创建和查询集群,并通过客户端或其他方式创建数据表descriptor.setDwsClusterName("dis_test");descriptor.setDwsClusterId("92f90f6a-de4d-4689-82f6-320c328b0062");descriptor.setDwsDatabaseName("postgres");descriptor.setDwsSchema("dbadmin");descriptor.setDwsTableName("distable01");descriptor.setDwsDelimiter("|");descriptor.setUserName(System.getenv("DB_ADMIN"));descriptor.setUserPassword(System.getenv("DB_PASSWORD"));//DIS调用KMS服务加密存储DWS的密码,保证用户数据安全:用户可通过数据加密服务(简称KMS)控制台的"密钥管理"创建和查询KMS密钥信息descriptor.setKmsUserKeyName("qiyinshan");descriptor.setKmsUserKeyId("9521c600-64a8-4971-ad36-7bbfa6d00c41");// 转储DWS通过OBS服务中转,需配置OBS桶名和子文件夹名,此目录也用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹descriptor.setObsBucketPath("obs-dis");descriptor.setFilePrefix("transfertask");// 转储周期,单位sdescriptor.setDeliverTimeInterval(900);// 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。descriptor.setAgencyName("dis_admin_agency");// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());request.setDwsDestinationDescriptor(descriptor);
  • 变更分区数量 参考初始化DIS客户端的操作初始化一个DIS客户端实例,实例名称为dic。 其中,“streamName”的配置值要与开通DIS通道中“通道名称”的值一致,“endpoint”,“ak”,“sk”,“region”,“projectId”信息请参见获取认证信息。 1 2 3 4 5 6 7 8 9101112131415 // 目标分区数量int targetPartitionCount = 2; UpdatePartitionCountRequest update = new UpdatePartitionCountRequest();update.setStreamName(streamName);update.setTargetPartitionCount(targetPartitionCount);try{ UpdatePartitionCountResult updatePartitionCountResult = dic.updatePartitionCount(update); LOG GER.info("Success to update partition count, {}", updatePartitionCountResult);}catch (Exception e){ LOGGER.error("Failed to update partition count", e);} 变更分区数量成功的返回信息如下。 1 Success to update partition count, UpdatePartitionCountResult [currentPartitionCount=2, streamName=mystream, targetPartitionCount=2] 父主题: 使用SDK(Java)
  • Protobuf格式下载 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 初始化DIS客户端,加入一项参数bodySerializeType,如下所示:cli = disclient(endpoint='', ak=os.environ.get("HUAWEICLOUD_SDK_AK"), sk=os.environ.get("HUAWEICLOUD_SDK_SK"), projectid='', region='',bodySerializeType='protobuf')
  • 查询转储详情 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 配置参数如下: 12 streamname="dis-test1"#已存在的通道名task_name="test_1" #查询该通道下的xx转储任务 配置好以上参数后,执行describe_dump_task_sample.py文件默认调用describe_dump_task_test方法。 返回信息如下: 12 200{'state': 'RUNNING', 'stream_name': 'dis-test1', 'create_time': 1537949648144, 'last_transfer_timestamp': 1538018072564, 'destination_type': 'OBS', 'obs_destination_description': {'obs_bucket_path': '002', 'deliver_time_interval': 30, 'retry_duration': 0, 'agency_name': 'all', 'partition_format': 'yyyy/MM/dd/HH/mm', 'destination_file_type': 'text', 'record_delimiter': '|', 'consumer_strategy': 'LATEST', 'file_prefix': ''}, 'task_name': 'test_1', 'partitions': [{'state': 'RUNNING', 'discard': 0, 'last_transfer_offset': 500, 'partitionId': 'shardId-0000000000', 'last_transfer_timestamp': 1538018072564}, {'state': 'RUNNING', 'discard': 0, 'last_transfer_offset': 500, 'partitionId': 'shardId-0000000001', 'last_transfer_timestamp': 1538018072564}]} 父主题: 使用SDK(Python)
  • Protobuf格式上传流式数据 参见初始化DIS客户端的操作初始化一个DIS客户端实例。 初始化DIS客户端,加入一项参数bodySerializeType,如下所示: cli = disclient(endpoint='', ak=os.environ.get("HUAWEICLOUD_SDK_AK"), sk=os.environ.get("HUAWEICLOUD_SDK_SK"), projectid='', region='',bodySerializeType='protobuf') 配置参数如下: 1 streamname="dis-test1" #已存在的通道名 参照该文件中的test方法,bodySerializeType="protobuf"选取protobuf格式上传。 protobuf_putRecords_sample.py文件中的protobuf_putRecords_test方法中的records为需要上传的数据内容,数据上传格式如下: 1234 records=[{"data": "abcdefd", "partition_id": “shardId-0000000001”}]#"data":"xxx"为上传的数据值,请自定义;“partition_id”:“shardId-0000000001”为数据写入的分区id值,请自定义。 record1 = {"data": "xxx","partition_id": partition_id} #可写入多条数据,数据格式如record1所示,每写一条数据使用下面的append方法传入records中。 配置好以上参数后,执行protobuf_putRecords_sample.py文件调用protobuf_putRecords_test方法,响应结果下: 12 200{'failed_record_count': 0, 'records': [{'partition_id': 'shardId-0000000001', 'sequence_number': '15'}]} 父主题: 使用SDK(Python)
  • 创建通道 参考初始化DIS客户端的操作初始化一个DIS客户端实例,实例名称为dic。 使用DIS SDK创建DIS通道,需要指定通道名称、通道的分片数量以及通道类型。 其中,普通通道为“STREAM_TYPE_COMMON”,高级通道为“STREAM_TYPE_ADVANCED”。 1 2 3 4 5 6 7 8 9101112 CreateStreamRequest createStreamRequest = new CreateStreamRequest();// 通道名称String streamName = "myStream";createStreamRequest.setStreamName(streamName);// 通道类型: COMMON 普通通道; ADVANCED 高级通道createStreamRequest.setStreamType(StreamType.COMMON.name());// 通道的分片数量createStreamRequest.setPartitionCount(3);// 通道数据的保留时长: 单位小时,N*24,N的取值为1~7的整数createStreamRequest.setDataDuration(24);// 通道的源数据类型:缺省值:BLOBcreateStreamRequest.setDataType(DataTypeEnum.BLOB.name()); 配置“CreateStreamRequest”对象之后,通过调用createStream的方法创建通道。 1 dic.createStream(createStreamRequest); 父主题: 使用SDK(Java)
  • 删除转储任务 参考初始化DIS客户端的操作初始化一个DIS客户端实例,实例名称为dic。 使用DIS SDK删除指定的转储任务。 1234567 DeleteTransferTaskRequest request = new DeleteTransferTaskRequest();// 配置转储任务所属的通道名称request.setStreamName(streamName);// 配置待删除的转储任务名称request.setTransferTaskName(taskName); 配置“DeleteTransferTaskRequest”对象之后,通过调用deleteTransferTask的方法删除转储任务。 1 dic.deleteTransferTask(request); 父主题: 使用SDK(Java)
  • 源数据类型JSON/ CS V 表1 转储相关配置参数 参数 说明 取值 任务名称 用户创建转储任务时,需要指定转储任务名称,同一通道的转储任务名称不可重复。任务名称由英文字母、数字、中划线和下划线组成。长度为1~64个字符。 - DLI数据库 单击“选择”,在“选择DLI数据库”窗口选择一个数据库。 此配置项仅支持选择,不可手动输入。 - DLI数据表 单击“选择”,在“选择DLI数据表”窗口选择一个数据表。仅支持数据位置为DLI类型的数据表,且用户需具有该表的插入权限。 此配置项仅支持选择,不可手动输入。 配置此项必须已配置“DLI 数据库”。 偏移量 最新:最大偏移量,即获取最新的有效数据。 最早:最小偏移量,即读取最早的有效数据。 最新 数据转储周期 根据用户配置的时间,周期性的将数据导入目的地(OBS,MRS,DLI,DWS),若某个时间段内无数据,则此时间段不会生成打包文件。 取值范围:30~900。 单位:秒。 默认配置为300秒。 - 数据临时桶 用户数据先临时存储在OBS桶中,再转储到指定的转储服务,转储完成后临时桶中的数据会被清除。 - 数据临时目录 需要转储的数据临时存储在OBS桶下此配置项配置的目录中,转储完成后临时目录中的数据会被清除。 配置为空时,数据直接存储在OBS桶内。 -
  • 运行程序 出现类似信息表示下载数据成功: 14:55:42.954 [main] INFOcom.bigdata.dis.sdk.DISConfig - get from classLoader14:55:44.103 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader14:55:44.105 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 214:55:45.235 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get stream streamName[partitionId=0] cursor success : eyJnZXRJdGVyYXRvclBhcmFtIjp7InN0cmVhbS1uYW1lIjoiZGlzLTEzbW9uZXkiLCJwYXJ0aXRpb24taWQiOiIwIiwiY3Vyc29yLXR5cGUiOiJBVF9TRVFVRU5DRV9OVU1CRVIiLCJzdGFydGluZy1zZXF1ZW5jZS1udW1iZXIiOiIxMDY4OTcyIn0sImdlbmVyYXRlVGltZXN0YW1wIjoxNTEzNjY2NjMxMTYxfQ14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [964885], sequenceNumber [0].14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [910960], sequenceNumber [1].14:55:46.359 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [528377], sequenceNumber [2].
共100000条
提示

您即将访问非华为云网站,请注意账号财产安全