云服务器内容精选

  • 评估由配置变更触发的示例函数 Config服务检测到自定义合规规则范围内的资源发生更改时,会调用函数的示例如下: import time import http.client from huaweicloudsdkcore.auth.credentials import GlobalCredentials from huaweicloudsdkcore.exceptions.exceptions import ConnectionException from huaweicloudsdkcore.exceptions.exceptions import RequestTimeoutException from huaweicloudsdkcore.exceptions.exceptions import ServiceResponseException from huaweicloudsdkconfig.v1.region.config_region import ConfigRegion from huaweicloudsdkconfig.v1.config_client import ConfigClient from huaweicloudsdkconfig.v1 import PolicyResource, PolicyStateRequestBody from huaweicloudsdkconfig.v1 import UpdatePolicyStateRequest ''' 合规规则评估逻辑: 返回“Compliant” 或 “NonCompliant” 本示例中, 当资源类型为ecs.cloudservers, 且该ecs的vpcId字段不是合规规则参数所指定的vpcId时, 会返回不合规, 否则返回合规。 ''' def evaluate_compliance(resource, parameter): if resource.get("provider") != "ecs" or resource.get("type") != "cloudservers": return "Compliant" vpc_id = resource.get("properties", {}).get("metadata", {}).get("vpcId") return "Compliant" if vpc_id == parameter.get("vpcId") else "NonCompliant" def update_policy_state(context, domain_id, evaluation): auth = GlobalCredentials(ak=context.getAccessKey(), sk=context.getSecretKey(), domain_id=domain_id) client = ConfigClient.new_builder() \ .with_credentials(credentials=auth) \ .with_region(region=ConfigRegion.value_of(region_id="cn-north-4")) \ .build() try: response = client.update_policy_state(evaluation) return 200 except ConnectionException as e: print("A connect timeout exception occurs while the Config performs some operations, exception: ", e.error_msg) return e.status_code except RequestTimeoutException as e: print("A request timeout exception occurs while the Config performs some operations, exception: ", e.error_msg) return e.status_code except ServiceResponseException as e: print("There is service error, exception: ", e.status_code, e.error_msg) return e.status_code def handler(event, context): domain_id = event.get("domain_id") resource = event.get("invoking_event", {}) parameters = event.get("rule_parameter") compliance_state = evaluate_compliance(resource, parameters) request_body = UpdatePolicyStateRequest(PolicyStateRequestBody( policy_resource = PolicyResource( resource_id = resource.get("id"), resource_name = resource.get("name"), resource_provider = resource.get("provider"), resource_type = resource.get("type"), region_id = resource.get("region_id"), domain_id = domain_id ), trigger_type = event.get("trigger_type"), compliance_state = compliance_state, policy_assignment_id = event.get("policy_assignment_id"), policy_assignment_name = event.get("policy_assignment_name"), evaluation_time = event.get("evaluation_time"), evaluation_hash = event.get("evaluation_hash") )) for retry in range(5): status_code = update_policy_state(context, domain_id, request_body) if status_code == http.client.TOO_MANY_REQUESTS: print("TOO_MANY_REQUESTS: retry again") time.sleep(1) elif status_code == http.client.OK: print("Update policyState successfully.") break else: print("Failed to update policyState.") break