From fd72b722b2490b824f571a6de70e7d0e05fe87a0 Mon Sep 17 00:00:00 2001 From: austina-csa Date: Wed, 7 Aug 2024 11:16:17 -0700 Subject: [PATCH 01/15] TC-IDM-2.2 automation --- src/python_testing/TC_IDM_2_2.py | 617 +++++++++++++++++++++++++++++++ 1 file changed, 617 insertions(+) create mode 100644 src/python_testing/TC_IDM_2_2.py diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py new file mode 100644 index 00000000000000..a185b2a38e8ed8 --- /dev/null +++ b/src/python_testing/TC_IDM_2_2.py @@ -0,0 +1,617 @@ +# +# Copyright (c) 2024 Project CHIP Authors +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +import inspect +from enum import IntFlag + +import chip.clusters as Clusters +import global_attribute_ids +from basic_composition_support import BasicCompositionTests +from chip.clusters import ClusterObjects as ClusterObjects +from chip.clusters.Attribute import AttributePath, TypedAttributePath +from chip.clusters.ClusterObjects import ClusterObject +from chip.clusters.enum import MatterIntEnum +from chip.interaction_model import InteractionModelError, Status +from chip.tlv import uint +from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main +from mobly import asserts + + +class TC_IDM_2_2(MatterBaseTest, BasicCompositionTests): + + ROOT_NODE_ENDPOINT_ID = 0 + + @staticmethod + def get_typed_attribute_path(attribute: Clusters, ep: int = ROOT_NODE_ENDPOINT_ID): + return TypedAttributePath( + Path=AttributePath( + EndpointId=ep, + Attribute=attribute + ) + ) + + def all_type_attributes_for_cluster(self, cluster: ClusterObjects.Cluster, desired_type: type) -> list[ClusterObjects.ClusterAttributeDescriptor]: + all_attributes = [attribute for attribute in cluster.Attributes.__dict__.values() if inspect.isclass( + attribute) and issubclass(attribute, ClusterObjects.ClusterAttributeDescriptor)] + + # Hackish way to get enums to return properly -- the default behavior (under else block) returns a BLANK LIST without this workaround + # If type(attribute.attribute_type.Type) or type(ClusterObjects.ClusterObjectFieldDescriptor(Type=desired_type).Type are enums, they return , which are equal! + if desired_type == MatterIntEnum: + all_attributes_of_type = [attribute for attribute in all_attributes if type( + attribute.attribute_type.Type) == type(ClusterObjects.ClusterObjectFieldDescriptor(Type=desired_type).Type)] + else: + all_attributes_of_type = [attribute for attribute in all_attributes if attribute.attribute_type == + ClusterObjects.ClusterObjectFieldDescriptor(Type=desired_type)] + + return all_attributes_of_type + + def all_attributes_for_cluster(self, cluster: ClusterObjects.Cluster, inverted: bool = False) -> list[ClusterObjects.ClusterAttributeDescriptor]: + if not inverted: + all_attributes = [(attribute, attribute.attribute_type) for attribute in cluster.Attributes.__dict__.values( + ) if inspect.isclass(attribute) and issubclass(attribute, ClusterObjects.ClusterAttributeDescriptor)] + else: + all_attributes = [(attribute, attribute.attribute_type) for attribute in cluster.Attributes.__dict__.values( + ) if inspect.isclass(attribute) and not (issubclass(attribute, ClusterObjects.ClusterAttributeDescriptor))] + return all_attributes + + def all_attributes(self, inverted: bool = False) -> list[ClusterObjects.ClusterAttributeDescriptor]: + cluster_ids = Clusters.ClusterObjects.ALL_ATTRIBUTES.keys() + attribute_list = [] + for cluster_id in cluster_ids: + + cluster = Clusters.ClusterObjects.ALL_CLUSTERS[cluster_id] + cluster_attributes = self.all_attributes_for_cluster(cluster, inverted=inverted) + attribute_list.extend(cluster_attributes) + return attribute_list + + async def get_cluster_from_type(self, cluster_type: type) -> None: + for cluster in self.all_supported_clusters: + all_types = self.all_type_attributes_for_cluster(cluster, cluster_type) + if all_types: + chosen_cluster = all_types[0] + break + else: + print(f"Attribute not found on device: {cluster_type}") + chosen_cluster = None + + if chosen_cluster: + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [chosen_cluster]) + for endpoint in read_request: + nested_output = read_request[endpoint] + for cluster in nested_output: + attributes = nested_output[cluster] + asserts.assert_true(isinstance(attributes[chosen_cluster], cluster_type), + f"{chosen_cluster} is not a {cluster_type}") + return + + @async_test_body + async def test_TC_IDM_2_2(self): + + self.user_params["use_pase_only"] = False + super().setup_class() + await self.setup_class_helper() + # Test Setup + + all_clusters = [cluster for cluster in Clusters.ClusterObjects.ALL_ATTRIBUTES] + + server_list_attr = Clusters.Objects.Descriptor.Attributes.ServerList + server_list_attr_path = [(0, server_list_attr)] + descriptor_obj = Clusters.Objects.Descriptor + descriptor_obj_path = [(0, descriptor_obj)] + attribute_list = Clusters.Objects.Descriptor.Attributes.AttributeList + attribute_list_path = [0, attribute_list] + self.all_supported_clusters = [cluster for cluster in Clusters.__dict__.values( + ) if inspect.isclass(cluster) and issubclass(cluster, ClusterObjects.Cluster)] + + # # For str test + # power_source = Clusters.Objects.PowerSource + + # # For uint test + # microwave_oven_mode = Clusters.Objects.MicrowaveOvenMode + + # # For int test + # device_energy_mgmt = Clusters.Objects.DeviceEnergyManagement + + # # For float test + # unit_testing = Clusters.Objects.UnitTesting + + # # For list test + # energy_evse_mode = Clusters.Objects.EnergyEvseMode + + # # For bitmap test + # occupancy_sensing = Clusters.Objects.OccupancySensing + + self.print_step(0, "Commissioning - already done") + + wildcard_descriptor = await self.default_controller.ReadAttribute(self.dut_node_id, [(Clusters.Descriptor)]) + endpoints = list(wildcard_descriptor.keys()) + + endpoints.sort() + # non_existent_endpoint = next(i for i, e in enumerate(endpoints + [None]) if i != e) + + # Step 1 + + # TH sends the Read Request Message to the DUT to read one attribute on a given cluster and endpoint. + # AttributePath = [[Endpoint = Specific Endpoint, Cluster = Specific ClusterID, Attribute = Specific Attribute]] + # On receipt of this message, DUT should send a report data action with the attribute value to the DUT + + self.print_step(1, "Send Request Message to read one attribute on a given cluster and endpoint") + + # endpoint = [(0, Clusters.Objects.Descriptor.Attributes.ServerList)] + read_request_1 = await self.default_controller.ReadAttribute(self.dut_node_id, server_list_attr_path) + returned_endpoints = read_request_1[0].keys() + # returned_endpoints = dict_keys([]) + # Check if chip.clusters.Objects.Descriptor is in output + asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") + # Check if ServerList is in nested output + asserts.assert_in(server_list_attr, read_request_1[0][descriptor_obj], "ServerList not in output") + + # Step 2 + # TH sends the Read Request Message to the DUT to read all attributes on a given cluster and Endpoint + # AttributePath = [[Endpoint = Specific Endpoint, Cluster = Specific ClusterID]] + # On receipt of this message, DUT should send a report data action with the attribute value to the DUT. + + self.print_step(2, "Send Request Message to read all attributes on a given cluster and endpoint") + # endpoint = [(0, Clusters.Objects.Descriptor)] + read_request_2 = await self.default_controller.ReadAttribute(self.dut_node_id, descriptor_obj_path) + returned_endpoints = read_request_2[0].keys() + # returned_endpoints = dict_keys([]) + # Check if chip.clusters.Objects.Descriptor is in output + asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") + + # Step 3 + + # TH sends the Read Request Message to the DUT to read an attribute from a cluster at all Endpoints + # AttributePath = [[Cluster = Specific ClusterID, Attribute = Specific Attribute]] + # On receipt of this message, DUT should send a report data action with the attribute value from all the Endpoints to the DUT. + self.print_step(3, "Send Request Message to read one attribute on a given cluster at all endpoints") + # endpoint = [Clusters.Objects.Descriptor.Attributes.ServerList] + read_request_3 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) + for i in range(3): + returned_endpoints = read_request_3[i].keys() + + # returned_endpoints = dict_keys([]) + # Check if chip.clusters.Objects.Descriptor is in output + asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") + # Check if ServerList is in nested output + asserts.assert_in(server_list_attr, read_request_3[i][descriptor_obj], "ServerList not in output") + + # Step 4 + + # TH sends the Read Request Message to the DUT to read a global attribute from all clusters at that Endpoint + # AttributePath = [[Endpoint = Specific Endpoint, Attribute = Specific Global Attribute]] + # On receipt of this message, DUT should send a report data action with the attribute value from all the clusters to the DUT. + self.print_step(4, "Send Request Message to read one global attribute from all clusters at that endpoint") + + # endpoint = [0, Clusters.Objects.Descriptor.Attributes.AttributeList] # Is this a global attribute? Trial and error, but looks like it + read_request_4 = await self.default_controller.ReadAttribute(self.dut_node_id, attribute_list_path) + for i in range(3): + returned_endpoints = read_request_4[i].keys() + # Check if chip.clusters.Objects.Descriptor is in output + asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") + # Check if AttributeList is in nested output + asserts.assert_in(attribute_list, read_request_4[i][descriptor_obj], "AttributeList not in output") + + # Step 5 + # TH sends the Read Request Message to the DUT to read all attributes from all clusters on all Endpoints + ### AttributePath = [[]] + # On receipt of this message, DUT should send a report data action with the attribute value from all the clusters to the DUT. + self.print_step(5, "Send Request Message to read all attributes from all clusters on all endpoints") + read_request_5 = await self.default_controller.ReadAttribute(self.dut_node_id, [()]) + + for i in range(3): + returned_endpoints = read_request_5[i].keys() + # Check if chip.clusters.Objects.Descriptor is in output + asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") + # Check if AttributeList is in nested output + asserts.assert_in(attribute_list, read_request_5[i][descriptor_obj], "AttributeList not in output") + + # Step 6 + # TH sends the Read Request Message to the DUT to read a global attribute from all clusters at all Endpoints + # AttributePath = [[Attribute = Specific Global Attribute]] + # On receipt of this message, DUT should send a report data action with the attribute value from all the clusters to the DUT. + self.print_step(6, "Send Request Message to read one global attribute from all clusters on all endpoints") + # endpoint = [Clusters.Objects.Descriptor.Attributes.AttributeList] + read_request_6 = await self.default_controller.ReadAttribute(self.dut_node_id, [attribute_list]) + returned_endpoints = read_request_6[0].keys() + for i in range(3): + returned_endpoints = read_request_6[i].keys() + # Check if chip.clusters.Objects.Descriptor is in output + asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") + # Check if AttributeList is in nested output + asserts.assert_in(attribute_list, read_request_6[i][descriptor_obj], "AttributeList not in output") + + # Step 7 + # TH sends the Read Request Message to the DUT to read all attributes from a cluster at all Endpoints + # AttributePath = [[Cluster = Specific ClusterID]] + # On receipt of this message, DUT should send a report data action with the attribute value from all the Endpoints to the DUT. + self.print_step(7, "Send Request Message to read all attributes from one cluster at all endpoints") + # endpoint = [Clusters.Objects.Descriptor] + read_request_7 = await self.default_controller.ReadAttribute(self.dut_node_id, [descriptor_obj]) + returned_endpoints = read_request_7[0].keys() + for i in range(3): + returned_endpoints = read_request_7[i].keys() + # Check if chip.clusters.Objects.Descriptor is in output + asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") + # Check if AttributeList is in nested output + asserts.assert_in(attribute_list, read_request_7[i][descriptor_obj], "AttributeList not in output") + + # Step 8 + # TH sends the Read Request Message to the DUT to read all attributes from all clusters at one Endpoint + # AttributePath = [[Endpoint = Specific Endpoint]] + # On receipt of this message, DUT should send a report data action with the attribute value from all the Endpoints to the DUT. + self.print_step(8, "Send Request Message to read all attributes from all clusters at one endpoint") + read_request_8 = await self.default_controller.ReadAttribute(self.dut_node_id, [0]) + returned_endpoints = read_request_8[0].keys() + + # Check if chip.clusters.Objects.Descriptor is in output + asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") + # Check if ServerList is in nested output + asserts.assert_in(server_list_attr, read_request_1[0][descriptor_obj], "ServerList not in output") + + # Step 9 + # TH sends the Read Request Message to the DUT to read an attribute of data type bool. + # If the device does not have an attribute of data type bool, skip this step. + + # Verify on the TH that the DUT returns data from the expected attribute path. + + self.print_step(9, "Read a read request of attribute type bool") + await self.get_cluster_from_type(bool) + + # Step 10 + # TH sends the Read Request Message to the DUT to read an attribute of data type string. + # If the device does not have an attribute of data type string, skip this step. + + # Verify on the TH that the DUT returns data from the expected attribute path. + self.print_step(10, "Read a read request of attribute type string") + await self.get_cluster_from_type(str) + + # Step 11 + # TH sends the Read Request Message to the DUT to read an attribute of data type unsigned integer. + # If the device does not have an attribute of data type unsigned integer, skip this step. + + # Verify on the TH that the DUT returns data from the expected attribute path. + self.print_step(11, "Read a read request of attribute type unsigned integer") + await self.get_cluster_from_type(uint) + + # Step 12 + # TH sends the Read Request Message to the DUT to read an attribute of data type signed integer. + # If the device does not have an attribute of data type signed integer, skip this step. + + # Verify on the TH that the DUT returns data from the expected attribute path. + self.print_step(12, "Read a read request of attribute type signed integer") + await self.get_cluster_from_type(int) + + # Step 13 + # TH sends the Read Request Message to the DUT to read an attribute of data type floating point. + # If the device does not have an attribute of data type floating point, skip this step. + + # Verify on the TH that the DUT returns data from the expected attribute path. + self.print_step(13, "Read a read request of attribute type floating point") + await self.get_cluster_from_type(float) + + # Step 14 + # TH sends the Read Request Message to the DUT to read an attribute of data type Octet String. + # If the device does not have an attribute of data type octet string, skip this step. + + # Verify on the TH that the DUT returns data from the expected attribute path. + self.print_step(14, "Read a read request of attribute type octet string") + await self.get_cluster_from_type(bytes) + + # Step 15 + # TH sends the Read Request Message to the DUT to read an attribute of data type Struct. + # If the device does not have an attribute of data type struct, skip this step. + + # Verify on the TH that the DUT returns data from the expected attribute path. + self.print_step(15, "Read a read request of attribute type struct") + await self.get_cluster_from_type(ClusterObject) + + # Step 16 + # TH sends the Read Request Message to the DUT to read an attribute of data type List. + # If the device does not have an attribute of data type list, skip this step. + # Verify on the TH that the DUT returns data from the expected attribute path. + self.print_step(16, "Read a read request of attribute type list") + await self.get_cluster_from_type(list) + + # Step 17 + # TH sends the Read Request Message to the DUT to read an attribute of data type enum. + # If the device does not have an attribute of data type enum, skip this step. + # Verify on the TH that the DUT returns data from the expected attribute path. + self.print_step(17, "Read a read request of attribute type enum") + await self.get_cluster_from_type(MatterIntEnum) + + # Step 18 + # TH sends the Read Request Message to the DUT to read an attribute of data type bitmap. + # If the device does not have an attribute of data type bitmap, skip this step. + self.print_step(18, "Read a read request of attribute type bitmap") + await self.get_cluster_from_type(IntFlag) + + # Step 19 + # TH sends the Read Request Message to the DUT to read any attribute to an unsupported Endpoint. + # DUT responds with the report data action. + # Verify on the TH that the DUT sends the status code UNSUPPORTED_ENDPOINT + + self.print_step(19, "Send the Read Request Message to the DUT to read any attribute to an unsupported Endpoint") + supported_endpoints = set(self.endpoints.keys()) + all_endpoints = set(range(max(supported_endpoints)+2)) + unsupported = list(all_endpoints - supported_endpoints) + # Read descriptor + result = await self.read_single_attribute_expect_error(endpoint=unsupported[0], cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.FeatureMap, error=Status.UnsupportedEndpoint) + asserts.assert_true(isinstance(result.Reason, InteractionModelError), msg="Unexpected success reading invalid endpoint") + + # Seems to return only {} -- Is this a failure or the unintended behavior? + # read_request_19 = await self.default_controller.ReadAttribute(self.dut_node_id, [non_existent_endpoint]) + + # Step 20 + # TH sends the Read Request Message to the DUT to read any attribute to an unsupported cluster. + # DUT responds with the report data action. + + # Verify on the TH that the DUT sends the status code UNSUPPORTED_CLUSTER + self.print_step(20, "Send the Read Request Message to the DUT to read any attribute to an unsupported cluster") + + for endpoint_id, endpoint in self.endpoints.items(): + print(endpoint_id, endpoint) + for cluster_type, cluster in endpoint.items(): + if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: + continue + + all_clusters = set(list(ClusterObjects.ALL_CLUSTERS.keys())) + dut_clusters = set(list(x.id for x in endpoint.keys())) + + unsupported = [id for id in list(all_clusters - dut_clusters) if global_attribute_ids.attribute_id_type(id) + == global_attribute_ids.AttributeIdType.kStandardNonGlobal] + + unsupported_attribute = (ClusterObjects.ALL_ATTRIBUTES[unsupported[0]])[0] + + if unsupported: + result = await self.read_single_attribute_expect_error(endpoint=endpoint_id, cluster=ClusterObjects.ALL_CLUSTERS[unsupported[0]], attribute=unsupported_attribute, error=Status.UnsupportedCluster) + asserts.assert_true(isinstance(result.Reason, InteractionModelError), + msg="Unexpected success reading invalid cluster") + + # Step 21 + # TH sends the Read Request Message to the DUT to read an unsupported attribute + # DUT responds with the report data action. + self.print_step(21, "Send the Read Request Message to the DUT to read any attribute to an unsupported attribute") + + for endpoint_id, endpoint in self.endpoints.items(): + for cluster_type, cluster in endpoint.items(): + if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: + continue + + all_attrs = set(list(ClusterObjects.ALL_ATTRIBUTES[cluster_type.id].keys())) + dut_attrs = set(cluster[cluster_type.Attributes.AttributeList]) + + unsupported = [id for id in list(all_attrs - dut_attrs) if global_attribute_ids.attribute_id_type(id) + == global_attribute_ids.AttributeIdType.kStandardNonGlobal] + + if unsupported: + result = await self.read_single_attribute_expect_error(endpoint=endpoint_id, cluster=cluster_type, attribute=ClusterObjects.ALL_ATTRIBUTES[cluster_type.id][unsupported[0]], error=Status.UnsupportedAttribute) + asserts.assert_true(isinstance(result.Reason, InteractionModelError), + msg="Unexpected success reading invalid attribute") + + # all_attributes = self.all_attributes() + # all_attributes_set = set(x[0] for x in all_attributes) + + # supported_attributes = [] + # for cluster in read_request_5.values(): + # cluster_attributes = [list(attr.keys()) for attr in cluster.values()] + # for cluster_obj in cluster_attributes: + # supported_attributes.extend(cluster_obj) + + # supported_attributes_set = set(supported_attributes) + # all_attributes_set - supported_attributes_set + # unsupported_attributes_set = all_attributes_set - supported_attributes_set + # unsupported_attribute = list(unsupported_attributes_set)[0] + + # Seems to return only {} (like unsupported endpoints and clusters) + # read_request_21 = await self.default_controller.ReadAttribute(self.dut_node_id, [unsupported_attribute]) + + # Verify on the TH that the DUT sends the status code UNSUPPORTED_ATTRIBUTE + + # Step 22 + # TH sends the Read Request Message to the DUT to read an attribute + # Repeat the above steps 3 times + + # On the TH verify the received Report data message has the right attribute values for all the 3 times. + self.print_step(22, "Send the Read Request Message to the DUT 3 timesand check if they are correct each time") + read_request_22_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) + read_request_22_2 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) + read_request_22_3 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) + asserts.assert_equal(read_request_22_1, read_request_22_2) + asserts.assert_equal(read_request_22_2, read_request_22_3) + + # Step 23 + # TH sends a Read Request Message to the DUT to read a particular attribute with the DataVersionFilter Field not set. + # DUT sends back the attribute value with the DataVersion of the cluster. + # TH sends a second read request to the same cluster with the DataVersionFilter Field set with the dataversion value received before. + self.print_step( + 23, "Send the Read Request Message to the DUT to read a particular attribute with the DataVersionFilter Field not set") + # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly + # read_request_23 = await self.default_controller.ReadAttribute(self.dut_node_id, server_list_attr_path) + # data_version = read_request_23[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] + # data_version_filter = [(0, Clusters.Descriptor, data_version)] + + # read_request_23_2 = await self.default_controller.ReadAttribute(self.dut_node_id, server_list_attr_path, dataVersionFilters=data_version_filter) + # Seems to return {}? + + # DUT should not send a report data action with the attribute value to the TH if the data version is same as that requested. + + # Step 24 + # TH sends a Read Request Message to the DUT to read a particular attribute with the DataVersionFilter Field not set. + # DUT sends back the attribute value with the DataVersion of the cluster. + # TH sends a write request to the same cluster to write to any attribute. + # TH sends a second read request to read an attribute from the same cluster with the DataVersionFilter Field set with the dataversion value received before. + + self.print_step( + 24, "Send the Read Request Message to the DUT to read a particular attribute with the DataVersionFilter Field not set") + # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly + # read_request_24 = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)]) + await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap(value=123456))]) + + # data_version = read_request_24[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] + # data_version_filter = [(0, Clusters.Descriptor, data_version)] + # read_request_24_1 = await self.default_controller.ReadAttribute( + # self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)], dataVersionFilters=data_version_filter) + # Seems to return {}? + + # DUT should send a report data action with the attribute value to the TH. + + # Step 25 + + # TH sends a Read Request Message to the DUT to read all attributes on a cluster with the DataVersionFilter Field not set. + # DUT sends back the all the attribute values with the DataVersion of the cluster. + # TH sends a write request to the same cluster to write to any attribute. + # TH sends a second read request to read all the attributes from the same cluster with the DataVersionFilter Field set with the dataversion value received before. + + # DUT should send a report data action with all the attribute values to the TH. + self.print_step( + 25, "Send the Read Request Message to the DUT to read all attributes on a cluster with the DataVersionFilter Field not set") + # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly + # read_request_25 = await self.default_controller.ReadAttribute(self.dut_node_id, descriptor_obj_path) + await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap(value=654321))]) + # data_version = read_request_25[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] + # data_version_filter = [(0, Clusters.Descriptor, data_version)] + + # read_request_25_2 = await self.default_controller.ReadAttribute( + # self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)], dataVersionFilters=data_version_filter) + # Seems to return {}? + + # Step 26 + # TH sends a Read Request Message to the DUT to read a particular attribute on a particular cluster with the DataVersionFilter Field not set. + # DUT sends back the attribute value with the DataVersion of the cluster. + # TH sends a read request to the same cluster to read any attribute with the right DataVersion(received in the previous step) and also an older DataVersion. + # The Read Request Message should have 2 DataVersionIB filters. + + # DUT should send a report data action with the attribute value to the TH. + self.print_step( + 26, "Send the Read Request Message to read a particular attribute on a particular cluster with the DataVersionFilter Field not set") + # Temporarily commented to avoid linter errors + # read_request_26 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) + await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap(value=999))]) + # data_version = read_request_26[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] + # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly + # data_version_filter = [(0, Clusters.Descriptor, data_version)] + # read_request_26_2 = await self.default_controller.ReadAttribute( + # self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)], dataVersionFilters=data_version_filter) + # data_version_filter_2 = [(0, Clusters.Descriptor, data_version-1)] + # read_request_26_3 = await self.default_controller.ReadAttribute( + # self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)], dataVersionFilters=data_version_filter_2) + + # Step 27 + + # TH sends a Read Request Message to the DUT to read any supported attribute/wildcard on a particular cluster say A with the DataVersionFilter Field not set. + # DUT sends back the attribute value with the DataVersion of the cluster A. + # TH sends a Read Request Message to read any supported attribute/wildcard on cluster A and any supported attribute/wildcard on another cluster B. + # DataVersionList field should only contain the DataVersion of cluster A. + + # Verify that the DUT sends a report data action with the attribute value from the cluster B to the TH. + # Verify that the DUT does not send the attribute value from cluster A. + self.print_step( + 27, "Send the Read Request Message to read any supported attribute/wildcard on a particular cluster say A with the DataVersionFilter Field not set") + read_request_27_1_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) + + data_version_1 = read_request_27_1_1[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] + + # data_version_filter_1 = [(0, Clusters.Descriptor, data_version_1)] + + read_request_27_2_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.BasicInformation.Attributes.NodeLabel)]) + data_version_2 = read_request_27_2_1[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] + # data_version_filter_2 = [(0, Clusters.Descriptor, data_version_2)] + + asserts.assert_not_equal(data_version_1, data_version_2) + + # Step 28 + + # TH sends a Read Request Message to the DUT to read something(Attribute) which is larger than 1 MTU(1280 bytes) and per spec can be chunked + + + # Verify on the TH that the DUT sends a chunked data message with the SuppressResponse field set to False for all the messages except the last one. + # Verify the last chunked message sent has the SuppressResponse field set to True. + + # self.print_step(28, "Send the Read Request Message to read something(Attribute) which is larger than 1 MTU(1280 bytes) and per spec can be chunked +") + # This apparently already exists in TCP tests -- remove? + + # Step 29 + + # TH sends a Read Request Message to the DUT to read a non global attribute from all clusters at that Endpoint + # AttributePath = [[Endpoint = Specific Endpoint, Attribute = Specific Non Global Attribute]] + + + # On the TH verify that the DUT sends an error message and not the value of the attribute. + self.print_step(29, "Send the Read Request Message to the DUT to read a non global attribute from all clusters at that Endpoint") + found_non_global = False + for endpoint_id, endpoint in self.endpoints.items(): + if not found_non_global: + # global_attribute_ids.AttributeIdType.kStandardNonGlobal seems to be non-existent in chip-all-clusters-app + # But kTest does exist -> Clusters.Objects.UnitTesting + for cluster_type, cluster in endpoint.items(): + + if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: + # if global_attribute_ids.cluster_id_type(cluster_type.id) == global_attribute_ids.AttributeIdType.kStandardNonGlobal: + found_non_global = True + + non_global_attr = list(ClusterObjects.ALL_ATTRIBUTES[cluster_type.id].values())[0] + output = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, non_global_attr)]) + asserts.assert_true(isinstance(output[0][cluster_type][non_global_attr].Reason, InteractionModelError), + msg="Unexpected success reading non-global attribute") + continue + + # Step 30 + + # TH sends a Read Request Message to the DUT to read a non global attribute from all clusters at all Endpoints + # AttributePath = [[Attribute = Specific Non Global Attribute]] + + + # On the TH verify that the DUT sends an error message and not the value of the attribute. + self.print_step(30, "Send the Read Request Message to the DUT to read a non global attribute from all clusters at all Endpoints") + found_non_global = False + for endpoint_id, endpoint in self.endpoints.items(): + if not found_non_global: + # global_attribute_ids.AttributeIdType.kStandardNonGlobal seems to be non-existent in chip-all-clusters-app + # But kTest does exist -> Clusters.Objects.UnitTesting + for cluster_type, cluster in endpoint.items(): + + if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: + # if global_attribute_ids.cluster_id_type(cluster_type.id) == global_attribute_ids.AttributeIdType.kStandardNonGlobal: + found_non_global = True + continue + # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly + # non_global_cluster = cluster_type + # read_request_30 = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, non_global_cluster)]) + # Seems to return {}? + # Step 31 + + # TH should have access to only a single cluster at one Endpoint1. + # TH sends a Read Request Message to the DUT to read all attributes from all clusters at Endpoint1 + # AttributePath = [[Endpoint = Specific Endpoint]] + + + # Verify that the DUT sends back data of all attributes only from that one cluster to which it has access. + # Verify that there are no errors sent back for attributes the TH has no access to. + self.print_step(31, "Send the Read Request Message to the DUT to read all attributes from all clusters at Endpoint1") + read_request_31 = await self.default_controller.ReadAttribute(self.dut_node_id, [(1, descriptor_obj)]) + asserts.assert_true(1 in read_request_31, "Endpoint 1 missing in response") + asserts.assert_true(descriptor_obj in read_request_31[1], "Clusters.Objects.Descriptor not in response") + + # Step 32 + + # TH sends a Read Request Message to read all events and attributes from the DUT. + + # Verify that the DUT sends back data of all attributes and events that the TH has access to. + + # Clusters.Descriptor.Events doesn't seem to exist, despite https://project-chip.github.io/connectedhomeip-doc/testing/python.html#events suggesting it should + + +if __name__ == "__main__": + default_matter_test_main() From e6757da942e3595992dd63bf1936c001df0ff273 Mon Sep 17 00:00:00 2001 From: "Restyled.io" Date: Wed, 7 Aug 2024 18:19:11 +0000 Subject: [PATCH 02/15] Restyled by autopep8 --- src/python_testing/TC_IDM_2_2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index a185b2a38e8ed8..e2751a7664d71f 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -462,7 +462,7 @@ async def test_TC_IDM_2_2(self): # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly # read_request_24 = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)]) await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap(value=123456))]) - + # data_version = read_request_24[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] # data_version_filter = [(0, Clusters.Descriptor, data_version)] # read_request_24_1 = await self.default_controller.ReadAttribute( From a1c592aeac33156396cc4bf117ffdc03416c14a1 Mon Sep 17 00:00:00 2001 From: austina-csa Date: Mon, 12 Aug 2024 16:23:12 -0700 Subject: [PATCH 03/15] Addressing feedback from PR (part 1) --- src/python_testing/TC_IDM_2_2.py | 258 ++++++++++++++----------------- 1 file changed, 113 insertions(+), 145 deletions(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index e2751a7664d71f..a569044c41b7fe 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -14,6 +14,19 @@ # See the License for the specific language governing permissions and # limitations under the License. # +# + +# See https://github.com/project-chip/connectedhomeip/blob/master/docs/testing/python.md#defining-the-ci-test-arguments +# for details about the block below. +# +# === BEGIN CI TEST ARGUMENTS === +# test-runner-runs: run1 +# test-runner-run/run1/app: ${ALL_CLUSTERS_APP} +# test-runner-run/run1/factoryreset: True +# test-runner-run/run1/quiet: True +# test-runner-run/run1/app-args: --discriminator 1234 --KVS kvs1 --trace-to json:${TRACE_APP}.json +# test-runner-run/run1/script-args: --storage-path admin_storage.json --commissioning-method on-network --discriminator 1234 --passcode 20202021 --trace-to json:${TRACE_TEST_JSON}.json --trace-to perfetto:${TRACE_TEST_PERFETTO}.perfetto +# === END CI TEST ARGUMENTS === import inspect @@ -37,7 +50,7 @@ class TC_IDM_2_2(MatterBaseTest, BasicCompositionTests): ROOT_NODE_ENDPOINT_ID = 0 @staticmethod - def get_typed_attribute_path(attribute: Clusters, ep: int = ROOT_NODE_ENDPOINT_ID): + def get_typed_attribute_path(attribute: Clusters, ep: ClusterObjects.ClusterAttributeDescriptor = ROOT_NODE_ENDPOINT_ID): return TypedAttributePath( Path=AttributePath( EndpointId=ep, @@ -57,56 +70,46 @@ def all_type_attributes_for_cluster(self, cluster: ClusterObjects.Cluster, desir else: all_attributes_of_type = [attribute for attribute in all_attributes if attribute.attribute_type == ClusterObjects.ClusterObjectFieldDescriptor(Type=desired_type)] - return all_attributes_of_type - def all_attributes_for_cluster(self, cluster: ClusterObjects.Cluster, inverted: bool = False) -> list[ClusterObjects.ClusterAttributeDescriptor]: - if not inverted: - all_attributes = [(attribute, attribute.attribute_type) for attribute in cluster.Attributes.__dict__.values( - ) if inspect.isclass(attribute) and issubclass(attribute, ClusterObjects.ClusterAttributeDescriptor)] - else: - all_attributes = [(attribute, attribute.attribute_type) for attribute in cluster.Attributes.__dict__.values( - ) if inspect.isclass(attribute) and not (issubclass(attribute, ClusterObjects.ClusterAttributeDescriptor))] - return all_attributes - - def all_attributes(self, inverted: bool = False) -> list[ClusterObjects.ClusterAttributeDescriptor]: - cluster_ids = Clusters.ClusterObjects.ALL_ATTRIBUTES.keys() - attribute_list = [] - for cluster_id in cluster_ids: - - cluster = Clusters.ClusterObjects.ALL_CLUSTERS[cluster_id] - cluster_attributes = self.all_attributes_for_cluster(cluster, inverted=inverted) - attribute_list.extend(cluster_attributes) - return attribute_list - - async def get_cluster_from_type(self, cluster_type: type) -> None: - for cluster in self.all_supported_clusters: - all_types = self.all_type_attributes_for_cluster(cluster, cluster_type) + def all_device_clusters(self) -> set: + device_clusters = set() + for endpoint in self.endpoints: + device_clusters |= set(self.endpoints[endpoint].keys()) + return device_clusters + + async def get_cluster_from_type(self, desired_attribute_type: type) -> None: + # Get all clusters from device + + for cluster in self.device_clusters: + all_types = self.all_type_attributes_for_cluster(cluster, desired_attribute_type) if all_types: - chosen_cluster = all_types[0] + chosen_attribute = all_types[0] + chosen_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_attribute.cluster_id] break else: - print(f"Attribute not found on device: {cluster_type}") + print(f"Attribute not found on device: {desired_attribute_type}") chosen_cluster = None - if chosen_cluster: - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [chosen_cluster]) - for endpoint in read_request: - nested_output = read_request[endpoint] - for cluster in nested_output: - attributes = nested_output[cluster] - asserts.assert_true(isinstance(attributes[chosen_cluster], cluster_type), - f"{chosen_cluster} is not a {cluster_type}") + endpoint = None + for endpoint in self.endpoints: + if (chosen_cluster in self.endpoints[endpoint]) and (chosen_attribute in self.endpoints[endpoint][chosen_cluster]): + break + + if chosen_cluster and (endpoint is not None): + output = await self.read_single_attribute_check_success( + endpoint=endpoint, + dev_ctrl=self.default_controller, + cluster=chosen_cluster, + attribute=chosen_attribute) + return output return @async_test_body async def test_TC_IDM_2_2(self): - - self.user_params["use_pase_only"] = False - super().setup_class() - await self.setup_class_helper() # Test Setup - + await self.setup_class_helper(default_to_pase=False) + all_clusters = [cluster for cluster in Clusters.ClusterObjects.ALL_ATTRIBUTES] server_list_attr = Clusters.Objects.Descriptor.Attributes.ServerList @@ -115,34 +118,16 @@ async def test_TC_IDM_2_2(self): descriptor_obj_path = [(0, descriptor_obj)] attribute_list = Clusters.Objects.Descriptor.Attributes.AttributeList attribute_list_path = [0, attribute_list] + self.device_clusters = self.all_device_clusters() self.all_supported_clusters = [cluster for cluster in Clusters.__dict__.values( ) if inspect.isclass(cluster) and issubclass(cluster, ClusterObjects.Cluster)] - # # For str test - # power_source = Clusters.Objects.PowerSource - - # # For uint test - # microwave_oven_mode = Clusters.Objects.MicrowaveOvenMode - - # # For int test - # device_energy_mgmt = Clusters.Objects.DeviceEnergyManagement - - # # For float test - # unit_testing = Clusters.Objects.UnitTesting - - # # For list test - # energy_evse_mode = Clusters.Objects.EnergyEvseMode - - # # For bitmap test - # occupancy_sensing = Clusters.Objects.OccupancySensing - self.print_step(0, "Commissioning - already done") wildcard_descriptor = await self.default_controller.ReadAttribute(self.dut_node_id, [(Clusters.Descriptor)]) endpoints = list(wildcard_descriptor.keys()) endpoints.sort() - # non_existent_endpoint = next(i for i, e in enumerate(endpoints + [None]) if i != e) # Step 1 @@ -152,14 +137,14 @@ async def test_TC_IDM_2_2(self): self.print_step(1, "Send Request Message to read one attribute on a given cluster and endpoint") - # endpoint = [(0, Clusters.Objects.Descriptor.Attributes.ServerList)] - read_request_1 = await self.default_controller.ReadAttribute(self.dut_node_id, server_list_attr_path) - returned_endpoints = read_request_1[0].keys() - # returned_endpoints = dict_keys([]) + + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, server_list_attr_path) + returned_endpoints = read_request[0].keys() + # Check if chip.clusters.Objects.Descriptor is in output asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") # Check if ServerList is in nested output - asserts.assert_in(server_list_attr, read_request_1[0][descriptor_obj], "ServerList not in output") + asserts.assert_in(server_list_attr, read_request[0][descriptor_obj], "ServerList not in output") # Step 2 # TH sends the Read Request Message to the DUT to read all attributes on a given cluster and Endpoint @@ -167,10 +152,10 @@ async def test_TC_IDM_2_2(self): # On receipt of this message, DUT should send a report data action with the attribute value to the DUT. self.print_step(2, "Send Request Message to read all attributes on a given cluster and endpoint") - # endpoint = [(0, Clusters.Objects.Descriptor)] - read_request_2 = await self.default_controller.ReadAttribute(self.dut_node_id, descriptor_obj_path) - returned_endpoints = read_request_2[0].keys() - # returned_endpoints = dict_keys([]) + + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, descriptor_obj_path) + returned_endpoints = read_request[0].keys() + # Check if chip.clusters.Objects.Descriptor is in output asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") @@ -179,17 +164,19 @@ async def test_TC_IDM_2_2(self): # TH sends the Read Request Message to the DUT to read an attribute from a cluster at all Endpoints # AttributePath = [[Cluster = Specific ClusterID, Attribute = Specific Attribute]] # On receipt of this message, DUT should send a report data action with the attribute value from all the Endpoints to the DUT. + + # The number of endpoints needs to be read from the device. They're also not always sequential. This should come from the descriptor cluster parts list on EP0 self.print_step(3, "Send Request Message to read one attribute on a given cluster at all endpoints") - # endpoint = [Clusters.Objects.Descriptor.Attributes.ServerList] - read_request_3 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) - for i in range(3): - returned_endpoints = read_request_3[i].keys() + all_attributes = await self.default_controller.ReadAttribute(self.dut_node_id, [0, Clusters.Objects.Descriptor]) + + endpoint_list = list(all_attributes) + for endpoint in endpoint_list: - # returned_endpoints = dict_keys([]) - # Check if chip.clusters.Objects.Descriptor is in output - asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") - # Check if ServerList is in nested output - asserts.assert_in(server_list_attr, read_request_3[i][descriptor_obj], "ServerList not in output") + cluster = list(all_attributes[endpoint].keys())[0] + attribute = list(all_attributes[endpoint][cluster])[1] + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [endpoint, attribute]) + print(f"Output: {read_request[endpoint][cluster][attribute]}") + asserts.assert_true(read_request[endpoint][cluster][attribute] is not None, f"Value not present in attribute: {attribute}") # Step 4 @@ -199,77 +186,76 @@ async def test_TC_IDM_2_2(self): self.print_step(4, "Send Request Message to read one global attribute from all clusters at that endpoint") # endpoint = [0, Clusters.Objects.Descriptor.Attributes.AttributeList] # Is this a global attribute? Trial and error, but looks like it - read_request_4 = await self.default_controller.ReadAttribute(self.dut_node_id, attribute_list_path) + all_attributes = await self.default_controller.ReadAttribute(self.dut_node_id, [0, Clusters.Objects.Descriptor]) + # import pdb;pdb.set_trace() + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, attribute_list_path) for i in range(3): - returned_endpoints = read_request_4[i].keys() + returned_endpoints = read_request[i].keys() # Check if chip.clusters.Objects.Descriptor is in output asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") # Check if AttributeList is in nested output - asserts.assert_in(attribute_list, read_request_4[i][descriptor_obj], "AttributeList not in output") + asserts.assert_in(attribute_list, read_request[i][descriptor_obj], "AttributeList not in output") # Step 5 # TH sends the Read Request Message to the DUT to read all attributes from all clusters on all Endpoints ### AttributePath = [[]] # On receipt of this message, DUT should send a report data action with the attribute value from all the clusters to the DUT. self.print_step(5, "Send Request Message to read all attributes from all clusters on all endpoints") - read_request_5 = await self.default_controller.ReadAttribute(self.dut_node_id, [()]) + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [()]) for i in range(3): - returned_endpoints = read_request_5[i].keys() + returned_endpoints = read_request[i].keys() # Check if chip.clusters.Objects.Descriptor is in output asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") # Check if AttributeList is in nested output - asserts.assert_in(attribute_list, read_request_5[i][descriptor_obj], "AttributeList not in output") + asserts.assert_in(attribute_list, read_request[i][descriptor_obj], "AttributeList not in output") # Step 6 # TH sends the Read Request Message to the DUT to read a global attribute from all clusters at all Endpoints # AttributePath = [[Attribute = Specific Global Attribute]] # On receipt of this message, DUT should send a report data action with the attribute value from all the clusters to the DUT. self.print_step(6, "Send Request Message to read one global attribute from all clusters on all endpoints") - # endpoint = [Clusters.Objects.Descriptor.Attributes.AttributeList] - read_request_6 = await self.default_controller.ReadAttribute(self.dut_node_id, [attribute_list]) - returned_endpoints = read_request_6[0].keys() + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [attribute_list]) + returned_endpoints = read_request[0].keys() for i in range(3): - returned_endpoints = read_request_6[i].keys() + returned_endpoints = read_request[i].keys() # Check if chip.clusters.Objects.Descriptor is in output asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") # Check if AttributeList is in nested output - asserts.assert_in(attribute_list, read_request_6[i][descriptor_obj], "AttributeList not in output") + asserts.assert_in(attribute_list, read_request[i][descriptor_obj], "AttributeList not in output") # Step 7 # TH sends the Read Request Message to the DUT to read all attributes from a cluster at all Endpoints # AttributePath = [[Cluster = Specific ClusterID]] # On receipt of this message, DUT should send a report data action with the attribute value from all the Endpoints to the DUT. self.print_step(7, "Send Request Message to read all attributes from one cluster at all endpoints") - # endpoint = [Clusters.Objects.Descriptor] - read_request_7 = await self.default_controller.ReadAttribute(self.dut_node_id, [descriptor_obj]) - returned_endpoints = read_request_7[0].keys() + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [descriptor_obj]) + returned_endpoints = read_request[0].keys() for i in range(3): - returned_endpoints = read_request_7[i].keys() + returned_endpoints = read_request[i].keys() # Check if chip.clusters.Objects.Descriptor is in output asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") # Check if AttributeList is in nested output - asserts.assert_in(attribute_list, read_request_7[i][descriptor_obj], "AttributeList not in output") + asserts.assert_in(attribute_list, read_request[i][descriptor_obj], "AttributeList not in output") # Step 8 # TH sends the Read Request Message to the DUT to read all attributes from all clusters at one Endpoint # AttributePath = [[Endpoint = Specific Endpoint]] # On receipt of this message, DUT should send a report data action with the attribute value from all the Endpoints to the DUT. self.print_step(8, "Send Request Message to read all attributes from all clusters at one endpoint") - read_request_8 = await self.default_controller.ReadAttribute(self.dut_node_id, [0]) - returned_endpoints = read_request_8[0].keys() + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [0]) + returned_endpoints = read_request[0].keys() # Check if chip.clusters.Objects.Descriptor is in output asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") # Check if ServerList is in nested output - asserts.assert_in(server_list_attr, read_request_1[0][descriptor_obj], "ServerList not in output") + asserts.assert_in(server_list_attr, read_request[0][descriptor_obj], "ServerList not in output") # Step 9 # TH sends the Read Request Message to the DUT to read an attribute of data type bool. # If the device does not have an attribute of data type bool, skip this step. # Verify on the TH that the DUT returns data from the expected attribute path. - self.print_step(9, "Read a read request of attribute type bool") await self.get_cluster_from_type(bool) @@ -355,7 +341,7 @@ async def test_TC_IDM_2_2(self): asserts.assert_true(isinstance(result.Reason, InteractionModelError), msg="Unexpected success reading invalid endpoint") # Seems to return only {} -- Is this a failure or the unintended behavior? - # read_request_19 = await self.default_controller.ReadAttribute(self.dut_node_id, [non_existent_endpoint]) + # read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [non_existent_endpoint]) # Step 20 # TH sends the Read Request Message to the DUT to read any attribute to an unsupported cluster. @@ -365,7 +351,6 @@ async def test_TC_IDM_2_2(self): self.print_step(20, "Send the Read Request Message to the DUT to read any attribute to an unsupported cluster") for endpoint_id, endpoint in self.endpoints.items(): - print(endpoint_id, endpoint) for cluster_type, cluster in endpoint.items(): if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: continue @@ -393,33 +378,16 @@ async def test_TC_IDM_2_2(self): if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: continue - all_attrs = set(list(ClusterObjects.ALL_ATTRIBUTES[cluster_type.id].keys())) - dut_attrs = set(cluster[cluster_type.Attributes.AttributeList]) + all_attrs = set(list(ClusterObjects.ALL_ATTRIBUTES[cluster_type.id].keys())) + dut_attrs = set(cluster[cluster_type.Attributes.AttributeList]) - unsupported = [id for id in list(all_attrs - dut_attrs) if global_attribute_ids.attribute_id_type(id) - == global_attribute_ids.AttributeIdType.kStandardNonGlobal] + unsupported = [id for id in list(all_attrs - dut_attrs) if global_attribute_ids.attribute_id_type(id) + == global_attribute_ids.AttributeIdType.kStandardNonGlobal] - if unsupported: - result = await self.read_single_attribute_expect_error(endpoint=endpoint_id, cluster=cluster_type, attribute=ClusterObjects.ALL_ATTRIBUTES[cluster_type.id][unsupported[0]], error=Status.UnsupportedAttribute) - asserts.assert_true(isinstance(result.Reason, InteractionModelError), - msg="Unexpected success reading invalid attribute") - - # all_attributes = self.all_attributes() - # all_attributes_set = set(x[0] for x in all_attributes) - - # supported_attributes = [] - # for cluster in read_request_5.values(): - # cluster_attributes = [list(attr.keys()) for attr in cluster.values()] - # for cluster_obj in cluster_attributes: - # supported_attributes.extend(cluster_obj) - - # supported_attributes_set = set(supported_attributes) - # all_attributes_set - supported_attributes_set - # unsupported_attributes_set = all_attributes_set - supported_attributes_set - # unsupported_attribute = list(unsupported_attributes_set)[0] - - # Seems to return only {} (like unsupported endpoints and clusters) - # read_request_21 = await self.default_controller.ReadAttribute(self.dut_node_id, [unsupported_attribute]) + if unsupported: + result = await self.read_single_attribute_expect_error(endpoint=endpoint_id, cluster=cluster_type, attribute=ClusterObjects.ALL_ATTRIBUTES[cluster_type.id][unsupported[0]], error=Status.UnsupportedAttribute) + asserts.assert_true(isinstance(result.Reason, InteractionModelError), + msg="Unexpected success reading invalid attribute") # Verify on the TH that the DUT sends the status code UNSUPPORTED_ATTRIBUTE @@ -429,11 +397,11 @@ async def test_TC_IDM_2_2(self): # On the TH verify the received Report data message has the right attribute values for all the 3 times. self.print_step(22, "Send the Read Request Message to the DUT 3 timesand check if they are correct each time") - read_request_22_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) - read_request_22_2 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) - read_request_22_3 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) - asserts.assert_equal(read_request_22_1, read_request_22_2) - asserts.assert_equal(read_request_22_2, read_request_22_3) + read_request_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) + read_request_2 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) + read_request_3 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) + asserts.assert_equal(read_request_1, read_request_2) + asserts.assert_equal(read_request_2, read_request_3) # Step 23 # TH sends a Read Request Message to the DUT to read a particular attribute with the DataVersionFilter Field not set. @@ -442,11 +410,11 @@ async def test_TC_IDM_2_2(self): self.print_step( 23, "Send the Read Request Message to the DUT to read a particular attribute with the DataVersionFilter Field not set") # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly - # read_request_23 = await self.default_controller.ReadAttribute(self.dut_node_id, server_list_attr_path) - # data_version = read_request_23[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] + # read_request = await self.default_controller.ReadAttribute(self.dut_node_id, server_list_attr_path) + # data_version = read_request[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] # data_version_filter = [(0, Clusters.Descriptor, data_version)] - # read_request_23_2 = await self.default_controller.ReadAttribute(self.dut_node_id, server_list_attr_path, dataVersionFilters=data_version_filter) + # read_request_2 = await self.default_controller.ReadAttribute(self.dut_node_id, server_list_attr_path, dataVersionFilters=data_version_filter) # Seems to return {}? # DUT should not send a report data action with the attribute value to the TH if the data version is same as that requested. @@ -460,12 +428,12 @@ async def test_TC_IDM_2_2(self): self.print_step( 24, "Send the Read Request Message to the DUT to read a particular attribute with the DataVersionFilter Field not set") # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly - # read_request_24 = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)]) + # read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)]) await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap(value=123456))]) - # data_version = read_request_24[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] + # data_version = read_request[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] # data_version_filter = [(0, Clusters.Descriptor, data_version)] - # read_request_24_1 = await self.default_controller.ReadAttribute( + # read_request = await self.default_controller.ReadAttribute( # self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)], dataVersionFilters=data_version_filter) # Seems to return {}? @@ -482,12 +450,12 @@ async def test_TC_IDM_2_2(self): self.print_step( 25, "Send the Read Request Message to the DUT to read all attributes on a cluster with the DataVersionFilter Field not set") # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly - # read_request_25 = await self.default_controller.ReadAttribute(self.dut_node_id, descriptor_obj_path) + # read_request = await self.default_controller.ReadAttribute(self.dut_node_id, descriptor_obj_path) await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap(value=654321))]) - # data_version = read_request_25[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] + # data_version = read_request[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] # data_version_filter = [(0, Clusters.Descriptor, data_version)] - # read_request_25_2 = await self.default_controller.ReadAttribute( + # read_request = await self.default_controller.ReadAttribute( # self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)], dataVersionFilters=data_version_filter) # Seems to return {}? @@ -501,15 +469,15 @@ async def test_TC_IDM_2_2(self): self.print_step( 26, "Send the Read Request Message to read a particular attribute on a particular cluster with the DataVersionFilter Field not set") # Temporarily commented to avoid linter errors - # read_request_26 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) + # read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap(value=999))]) - # data_version = read_request_26[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] + # data_version = read_request[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly # data_version_filter = [(0, Clusters.Descriptor, data_version)] - # read_request_26_2 = await self.default_controller.ReadAttribute( + # read_request_2 = await self.default_controller.ReadAttribute( # self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)], dataVersionFilters=data_version_filter) # data_version_filter_2 = [(0, Clusters.Descriptor, data_version-1)] - # read_request_26_3 = await self.default_controller.ReadAttribute( + # read_request_3 = await self.default_controller.ReadAttribute( # self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)], dataVersionFilters=data_version_filter_2) # Step 27 @@ -589,7 +557,7 @@ async def test_TC_IDM_2_2(self): continue # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly # non_global_cluster = cluster_type - # read_request_30 = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, non_global_cluster)]) + # read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, non_global_cluster)]) # Seems to return {}? # Step 31 @@ -600,9 +568,9 @@ async def test_TC_IDM_2_2(self): # Verify that the DUT sends back data of all attributes only from that one cluster to which it has access. # Verify that there are no errors sent back for attributes the TH has no access to. self.print_step(31, "Send the Read Request Message to the DUT to read all attributes from all clusters at Endpoint1") - read_request_31 = await self.default_controller.ReadAttribute(self.dut_node_id, [(1, descriptor_obj)]) - asserts.assert_true(1 in read_request_31, "Endpoint 1 missing in response") - asserts.assert_true(descriptor_obj in read_request_31[1], "Clusters.Objects.Descriptor not in response") + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(1, descriptor_obj)]) + asserts.assert_true(1 in read_request, "Endpoint 1 missing in response") + asserts.assert_true(descriptor_obj in read_request[1], "Clusters.Objects.Descriptor not in response") # Step 32 From 64f4a388dff6451f0c05c01a269ec56becebf0bf Mon Sep 17 00:00:00 2001 From: "Restyled.io" Date: Mon, 12 Aug 2024 23:24:32 +0000 Subject: [PATCH 04/15] Restyled by autopep8 --- src/python_testing/TC_IDM_2_2.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index a569044c41b7fe..9e9486fe4dd9fa 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -80,7 +80,7 @@ def all_device_clusters(self) -> set: async def get_cluster_from_type(self, desired_attribute_type: type) -> None: # Get all clusters from device - + for cluster in self.device_clusters: all_types = self.all_type_attributes_for_cluster(cluster, desired_attribute_type) if all_types: @@ -109,7 +109,7 @@ async def get_cluster_from_type(self, desired_attribute_type: type) -> None: async def test_TC_IDM_2_2(self): # Test Setup await self.setup_class_helper(default_to_pase=False) - + all_clusters = [cluster for cluster in Clusters.ClusterObjects.ALL_ATTRIBUTES] server_list_attr = Clusters.Objects.Descriptor.Attributes.ServerList @@ -137,7 +137,6 @@ async def test_TC_IDM_2_2(self): self.print_step(1, "Send Request Message to read one attribute on a given cluster and endpoint") - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, server_list_attr_path) returned_endpoints = read_request[0].keys() @@ -152,10 +151,10 @@ async def test_TC_IDM_2_2(self): # On receipt of this message, DUT should send a report data action with the attribute value to the DUT. self.print_step(2, "Send Request Message to read all attributes on a given cluster and endpoint") - + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, descriptor_obj_path) returned_endpoints = read_request[0].keys() - + # Check if chip.clusters.Objects.Descriptor is in output asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") @@ -168,7 +167,7 @@ async def test_TC_IDM_2_2(self): # The number of endpoints needs to be read from the device. They're also not always sequential. This should come from the descriptor cluster parts list on EP0 self.print_step(3, "Send Request Message to read one attribute on a given cluster at all endpoints") all_attributes = await self.default_controller.ReadAttribute(self.dut_node_id, [0, Clusters.Objects.Descriptor]) - + endpoint_list = list(all_attributes) for endpoint in endpoint_list: @@ -382,7 +381,7 @@ async def test_TC_IDM_2_2(self): dut_attrs = set(cluster[cluster_type.Attributes.AttributeList]) unsupported = [id for id in list(all_attrs - dut_attrs) if global_attribute_ids.attribute_id_type(id) - == global_attribute_ids.AttributeIdType.kStandardNonGlobal] + == global_attribute_ids.AttributeIdType.kStandardNonGlobal] if unsupported: result = await self.read_single_attribute_expect_error(endpoint=endpoint_id, cluster=cluster_type, attribute=ClusterObjects.ALL_ATTRIBUTES[cluster_type.id][unsupported[0]], error=Status.UnsupportedAttribute) From cc34e02f8b1857ae20153cf0a429279047d9141d Mon Sep 17 00:00:00 2001 From: austina-csa Date: Thu, 15 Aug 2024 16:40:13 -0700 Subject: [PATCH 05/15] Addressing feedback for IDM 2-2 (part 2) --- src/python_testing/TC_IDM_2_2.py | 92 +++++++++++++++++--------------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index 9e9486fe4dd9fa..f63b3895fd44c8 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -36,29 +36,18 @@ import global_attribute_ids from basic_composition_support import BasicCompositionTests from chip.clusters import ClusterObjects as ClusterObjects -from chip.clusters.Attribute import AttributePath, TypedAttributePath +from chip.clusters.Attribute import AttributePath from chip.clusters.ClusterObjects import ClusterObject from chip.clusters.enum import MatterIntEnum from chip.interaction_model import InteractionModelError, Status from chip.tlv import uint from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main -from mobly import asserts +from mobly import asserts, signals class TC_IDM_2_2(MatterBaseTest, BasicCompositionTests): - ROOT_NODE_ENDPOINT_ID = 0 - - @staticmethod - def get_typed_attribute_path(attribute: Clusters, ep: ClusterObjects.ClusterAttributeDescriptor = ROOT_NODE_ENDPOINT_ID): - return TypedAttributePath( - Path=AttributePath( - EndpointId=ep, - Attribute=attribute - ) - ) - - def all_type_attributes_for_cluster(self, cluster: ClusterObjects.Cluster, desired_type: type) -> list[ClusterObjects.ClusterAttributeDescriptor]: + async def all_type_attributes_for_cluster(self, cluster: ClusterObjects.Cluster, desired_type: type) -> list[ClusterObjects.ClusterAttributeDescriptor]: all_attributes = [attribute for attribute in cluster.Attributes.__dict__.values() if inspect.isclass( attribute) and issubclass(attribute, ClusterObjects.ClusterAttributeDescriptor)] @@ -67,6 +56,14 @@ def all_type_attributes_for_cluster(self, cluster: ClusterObjects.Cluster, desir if desired_type == MatterIntEnum: all_attributes_of_type = [attribute for attribute in all_attributes if type( attribute.attribute_type.Type) == type(ClusterObjects.ClusterObjectFieldDescriptor(Type=desired_type).Type)] + elif desired_type == IntFlag: + try: + feature_map = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.FeatureMap) + except signals.TestFailure: + print(f"{cluster} does not support Attributes.FeatureMap") + return [] + if feature_map >= 1: + return [cluster.Attributes.FeatureMap] else: all_attributes_of_type = [attribute for attribute in all_attributes if attribute.attribute_type == ClusterObjects.ClusterObjectFieldDescriptor(Type=desired_type)] @@ -78,11 +75,11 @@ def all_device_clusters(self) -> set: device_clusters |= set(self.endpoints[endpoint].keys()) return device_clusters - async def get_cluster_from_type(self, desired_attribute_type: type) -> None: + async def check_attribute_read_for_type(self, desired_attribute_type: type) -> None: # Get all clusters from device for cluster in self.device_clusters: - all_types = self.all_type_attributes_for_cluster(cluster, desired_attribute_type) + all_types = await self.all_type_attributes_for_cluster(cluster, desired_attribute_type) if all_types: chosen_attribute = all_types[0] chosen_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_attribute.cluster_id] @@ -153,11 +150,17 @@ async def test_TC_IDM_2_2(self): self.print_step(2, "Send Request Message to read all attributes on a given cluster and endpoint") read_request = await self.default_controller.ReadAttribute(self.dut_node_id, descriptor_obj_path) - returned_endpoints = read_request[0].keys() - + # Check that endpoint 0 is in output + asserts.assert_in(0, read_request, "Endpoint 0 not in output") # Check if chip.clusters.Objects.Descriptor is in output + returned_endpoints = read_request[0].keys() asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") - + returned_attributes = list(read_request[0][descriptor_obj].keys()) + if returned_attributes[0] == Clusters.Attribute.DataVersion: + returned_descriptor_attributes = returned_attributes[1:] + expected_descriptor_attributes = ClusterObjects.ALL_ATTRIBUTES[descriptor_obj.id] + # Actual failure + asserts.assert_equal(set(returned_descriptor_attributes), set(expected_descriptor_attributes.values())) # Step 3 # TH sends the Read Request Message to the DUT to read an attribute from a cluster at all Endpoints @@ -174,7 +177,6 @@ async def test_TC_IDM_2_2(self): cluster = list(all_attributes[endpoint].keys())[0] attribute = list(all_attributes[endpoint][cluster])[1] read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [endpoint, attribute]) - print(f"Output: {read_request[endpoint][cluster][attribute]}") asserts.assert_true(read_request[endpoint][cluster][attribute] is not None, f"Value not present in attribute: {attribute}") # Step 4 @@ -186,15 +188,16 @@ async def test_TC_IDM_2_2(self): # endpoint = [0, Clusters.Objects.Descriptor.Attributes.AttributeList] # Is this a global attribute? Trial and error, but looks like it all_attributes = await self.default_controller.ReadAttribute(self.dut_node_id, [0, Clusters.Objects.Descriptor]) - # import pdb;pdb.set_trace() + # TODO: Wait until PR 34833 is ready (python: Add direct attribute paths to Read) before reworking this read_request = await self.default_controller.ReadAttribute(self.dut_node_id, attribute_list_path) for i in range(3): returned_endpoints = read_request[i].keys() + # Check if chip.clusters.Objects.Descriptor is in output asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") # Check if AttributeList is in nested output asserts.assert_in(attribute_list, read_request[i][descriptor_obj], "AttributeList not in output") - + # Step 5 # TH sends the Read Request Message to the DUT to read all attributes from all clusters on all Endpoints ### AttributePath = [[]] @@ -256,7 +259,7 @@ async def test_TC_IDM_2_2(self): # Verify on the TH that the DUT returns data from the expected attribute path. self.print_step(9, "Read a read request of attribute type bool") - await self.get_cluster_from_type(bool) + x = await self.check_attribute_read_for_type(bool) # Step 10 # TH sends the Read Request Message to the DUT to read an attribute of data type string. @@ -264,7 +267,7 @@ async def test_TC_IDM_2_2(self): # Verify on the TH that the DUT returns data from the expected attribute path. self.print_step(10, "Read a read request of attribute type string") - await self.get_cluster_from_type(str) + await self.check_attribute_read_for_type(str) # Step 11 # TH sends the Read Request Message to the DUT to read an attribute of data type unsigned integer. @@ -272,7 +275,7 @@ async def test_TC_IDM_2_2(self): # Verify on the TH that the DUT returns data from the expected attribute path. self.print_step(11, "Read a read request of attribute type unsigned integer") - await self.get_cluster_from_type(uint) + await self.check_attribute_read_for_type(uint) # Step 12 # TH sends the Read Request Message to the DUT to read an attribute of data type signed integer. @@ -280,7 +283,7 @@ async def test_TC_IDM_2_2(self): # Verify on the TH that the DUT returns data from the expected attribute path. self.print_step(12, "Read a read request of attribute type signed integer") - await self.get_cluster_from_type(int) + await self.check_attribute_read_for_type(int) # Step 13 # TH sends the Read Request Message to the DUT to read an attribute of data type floating point. @@ -288,7 +291,7 @@ async def test_TC_IDM_2_2(self): # Verify on the TH that the DUT returns data from the expected attribute path. self.print_step(13, "Read a read request of attribute type floating point") - await self.get_cluster_from_type(float) + await self.check_attribute_read_for_type(float) # Step 14 # TH sends the Read Request Message to the DUT to read an attribute of data type Octet String. @@ -296,7 +299,7 @@ async def test_TC_IDM_2_2(self): # Verify on the TH that the DUT returns data from the expected attribute path. self.print_step(14, "Read a read request of attribute type octet string") - await self.get_cluster_from_type(bytes) + await self.check_attribute_read_for_type(bytes) # Step 15 # TH sends the Read Request Message to the DUT to read an attribute of data type Struct. @@ -304,27 +307,28 @@ async def test_TC_IDM_2_2(self): # Verify on the TH that the DUT returns data from the expected attribute path. self.print_step(15, "Read a read request of attribute type struct") - await self.get_cluster_from_type(ClusterObject) + await self.check_attribute_read_for_type(ClusterObject) # Step 16 # TH sends the Read Request Message to the DUT to read an attribute of data type List. # If the device does not have an attribute of data type list, skip this step. # Verify on the TH that the DUT returns data from the expected attribute path. self.print_step(16, "Read a read request of attribute type list") - await self.get_cluster_from_type(list) + await self.check_attribute_read_for_type(list) # Step 17 # TH sends the Read Request Message to the DUT to read an attribute of data type enum. # If the device does not have an attribute of data type enum, skip this step. # Verify on the TH that the DUT returns data from the expected attribute path. self.print_step(17, "Read a read request of attribute type enum") - await self.get_cluster_from_type(MatterIntEnum) + await self.check_attribute_read_for_type(MatterIntEnum) # Step 18 # TH sends the Read Request Message to the DUT to read an attribute of data type bitmap. # If the device does not have an attribute of data type bitmap, skip this step. self.print_step(18, "Read a read request of attribute type bitmap") - await self.get_cluster_from_type(IntFlag) + + await self.check_attribute_read_for_type(IntFlag) # Step 19 # TH sends the Read Request Message to the DUT to read any attribute to an unsupported Endpoint. @@ -337,10 +341,6 @@ async def test_TC_IDM_2_2(self): unsupported = list(all_endpoints - supported_endpoints) # Read descriptor result = await self.read_single_attribute_expect_error(endpoint=unsupported[0], cluster=Clusters.Descriptor, attribute=Clusters.Descriptor.Attributes.FeatureMap, error=Status.UnsupportedEndpoint) - asserts.assert_true(isinstance(result.Reason, InteractionModelError), msg="Unexpected success reading invalid endpoint") - - # Seems to return only {} -- Is this a failure or the unintended behavior? - # read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [non_existent_endpoint]) # Step 20 # TH sends the Read Request Message to the DUT to read any attribute to an unsupported cluster. @@ -371,22 +371,30 @@ async def test_TC_IDM_2_2(self): # TH sends the Read Request Message to the DUT to read an unsupported attribute # DUT responds with the report data action. self.print_step(21, "Send the Read Request Message to the DUT to read any attribute to an unsupported attribute") - + found_unsupported = False for endpoint_id, endpoint in self.endpoints.items(): + + if found_unsupported: + break for cluster_type, cluster in endpoint.items(): if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: continue - + all_attrs = set(list(ClusterObjects.ALL_ATTRIBUTES[cluster_type.id].keys())) dut_attrs = set(cluster[cluster_type.Attributes.AttributeList]) - + unsupported = [id for id in list(all_attrs - dut_attrs) if global_attribute_ids.attribute_id_type(id) - == global_attribute_ids.AttributeIdType.kStandardNonGlobal] - + == global_attribute_ids.AttributeIdType.kStandardNonGlobal] + if unsupported: - result = await self.read_single_attribute_expect_error(endpoint=endpoint_id, cluster=cluster_type, attribute=ClusterObjects.ALL_ATTRIBUTES[cluster_type.id][unsupported[0]], error=Status.UnsupportedAttribute) + result = await self.read_single_attribute_expect_error( + endpoint=endpoint_id, + cluster=cluster_type, + attribute=ClusterObjects.ALL_ATTRIBUTES[cluster_type.id][unsupported[0]], error=Status.UnsupportedAttribute) asserts.assert_true(isinstance(result.Reason, InteractionModelError), msg="Unexpected success reading invalid attribute") + found_unsupported = True + break # Verify on the TH that the DUT sends the status code UNSUPPORTED_ATTRIBUTE From 8c5f14a3a2c3a6cbc52e8a86ae891c4010bcfb1b Mon Sep 17 00:00:00 2001 From: austina-csa Date: Thu, 15 Aug 2024 17:09:48 -0700 Subject: [PATCH 06/15] Resolved linting issues in CI/CD check --- src/python_testing/TC_IDM_2_2.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index f63b3895fd44c8..cd1a11418e37d5 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -36,7 +36,6 @@ import global_attribute_ids from basic_composition_support import BasicCompositionTests from chip.clusters import ClusterObjects as ClusterObjects -from chip.clusters.Attribute import AttributePath from chip.clusters.ClusterObjects import ClusterObject from chip.clusters.enum import MatterIntEnum from chip.interaction_model import InteractionModelError, Status @@ -259,7 +258,7 @@ async def test_TC_IDM_2_2(self): # Verify on the TH that the DUT returns data from the expected attribute path. self.print_step(9, "Read a read request of attribute type bool") - x = await self.check_attribute_read_for_type(bool) + await self.check_attribute_read_for_type(bool) # Step 10 # TH sends the Read Request Message to the DUT to read an attribute of data type string. From 47cd63e864bc05a650604c371a82aba348ca7d3e Mon Sep 17 00:00:00 2001 From: "Restyled.io" Date: Fri, 16 Aug 2024 00:14:54 +0000 Subject: [PATCH 07/15] Restyled by autopep8 --- src/python_testing/TC_IDM_2_2.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index cd1a11418e37d5..69ddf48368e436 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -191,12 +191,12 @@ async def test_TC_IDM_2_2(self): read_request = await self.default_controller.ReadAttribute(self.dut_node_id, attribute_list_path) for i in range(3): returned_endpoints = read_request[i].keys() - + # Check if chip.clusters.Objects.Descriptor is in output asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") # Check if AttributeList is in nested output asserts.assert_in(attribute_list, read_request[i][descriptor_obj], "AttributeList not in output") - + # Step 5 # TH sends the Read Request Message to the DUT to read all attributes from all clusters on all Endpoints ### AttributePath = [[]] @@ -372,19 +372,19 @@ async def test_TC_IDM_2_2(self): self.print_step(21, "Send the Read Request Message to the DUT to read any attribute to an unsupported attribute") found_unsupported = False for endpoint_id, endpoint in self.endpoints.items(): - + if found_unsupported: break for cluster_type, cluster in endpoint.items(): if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: continue - + all_attrs = set(list(ClusterObjects.ALL_ATTRIBUTES[cluster_type.id].keys())) dut_attrs = set(cluster[cluster_type.Attributes.AttributeList]) - + unsupported = [id for id in list(all_attrs - dut_attrs) if global_attribute_ids.attribute_id_type(id) - == global_attribute_ids.AttributeIdType.kStandardNonGlobal] - + == global_attribute_ids.AttributeIdType.kStandardNonGlobal] + if unsupported: result = await self.read_single_attribute_expect_error( endpoint=endpoint_id, From e9f8b6665cffa3d5e9afc80a51f0e17b116697a6 Mon Sep 17 00:00:00 2001 From: austina-csa Date: Sun, 15 Sep 2024 23:35:14 -0700 Subject: [PATCH 08/15] Addressing feedback --- src/python_testing/TC_IDM_2_2.py | 357 ++++++++++++++++--------------- 1 file changed, 190 insertions(+), 167 deletions(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index 69ddf48368e436..65e9144aee8766 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -34,14 +34,25 @@ import chip.clusters as Clusters import global_attribute_ids +from dataclasses import dataclass, field from basic_composition_support import BasicCompositionTests from chip.clusters import ClusterObjects as ClusterObjects from chip.clusters.ClusterObjects import ClusterObject from chip.clusters.enum import MatterIntEnum +from chip.clusters.Attribute import AttributePath from chip.interaction_model import InteractionModelError, Status from chip.tlv import uint -from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main +from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, get_accepted_endpoints_for_test from mobly import asserts, signals +# from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union +from typing import Optional + +@dataclass(frozen=True) +class AttributePath: + EndpointId: Optional[int] = None + ClusterId: Optional[int] = None + AttributeId: Optional[int] = None + class TC_IDM_2_2(MatterBaseTest, BasicCompositionTests): @@ -55,17 +66,20 @@ async def all_type_attributes_for_cluster(self, cluster: ClusterObjects.Cluster, if desired_type == MatterIntEnum: all_attributes_of_type = [attribute for attribute in all_attributes if type( attribute.attribute_type.Type) == type(ClusterObjects.ClusterObjectFieldDescriptor(Type=desired_type).Type)] + elif desired_type == IntFlag: - try: - feature_map = await self.read_single_attribute_check_success(cluster, attribute=cluster.Attributes.FeatureMap) - except signals.TestFailure: - print(f"{cluster} does not support Attributes.FeatureMap") - return [] - if feature_map >= 1: - return [cluster.Attributes.FeatureMap] + if hasattr(cluster, 'Attributes'): + attributes_class = getattr(cluster, 'Attributes') + if hasattr(attributes_class, 'FeatureMap'): + all_attributes_of_type = [cluster.Attributes.FeatureMap] + else: + raise Exception(f'Cluster {cluster} lacks a FeatureMap') + else: + raise Exception(f'Cluster {cluster} lacks attributes') else: all_attributes_of_type = [attribute for attribute in all_attributes if attribute.attribute_type == ClusterObjects.ClusterObjectFieldDescriptor(Type=desired_type)] + return all_attributes_of_type def all_device_clusters(self) -> set: @@ -81,25 +95,21 @@ async def check_attribute_read_for_type(self, desired_attribute_type: type) -> N all_types = await self.all_type_attributes_for_cluster(cluster, desired_attribute_type) if all_types: chosen_attribute = all_types[0] - chosen_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_attribute.cluster_id] + cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_attribute.cluster_id] break else: print(f"Attribute not found on device: {desired_attribute_type}") - chosen_cluster = None + cluster = None endpoint = None for endpoint in self.endpoints: - if (chosen_cluster in self.endpoints[endpoint]) and (chosen_attribute in self.endpoints[endpoint][chosen_cluster]): - break - - if chosen_cluster and (endpoint is not None): - output = await self.read_single_attribute_check_success( - endpoint=endpoint, - dev_ctrl=self.default_controller, - cluster=chosen_cluster, - attribute=chosen_attribute) - return output - return + for cluster, clusterdata in self.endpoints[endpoint].items(): + all_types = await self.all_type_attributes_for_cluster(cluster, desired_attribute_type) + attributes_of_type = set(all_types) + attributes_of_type_on_device = attributes_of_type.intersection(set(clusterdata.keys())) + if attributes_of_type_on_device: + return True + return False @async_test_body async def test_TC_IDM_2_2(self): @@ -108,12 +118,6 @@ async def test_TC_IDM_2_2(self): all_clusters = [cluster for cluster in Clusters.ClusterObjects.ALL_ATTRIBUTES] - server_list_attr = Clusters.Objects.Descriptor.Attributes.ServerList - server_list_attr_path = [(0, server_list_attr)] - descriptor_obj = Clusters.Objects.Descriptor - descriptor_obj_path = [(0, descriptor_obj)] - attribute_list = Clusters.Objects.Descriptor.Attributes.AttributeList - attribute_list_path = [0, attribute_list] self.device_clusters = self.all_device_clusters() self.all_supported_clusters = [cluster for cluster in Clusters.__dict__.values( ) if inspect.isclass(cluster) and issubclass(cluster, ClusterObjects.Cluster)] @@ -132,14 +136,10 @@ async def test_TC_IDM_2_2(self): # On receipt of this message, DUT should send a report data action with the attribute value to the DUT self.print_step(1, "Send Request Message to read one attribute on a given cluster and endpoint") + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.ServerList)]) - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, server_list_attr_path) - returned_endpoints = read_request[0].keys() - - # Check if chip.clusters.Objects.Descriptor is in output - asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") - # Check if ServerList is in nested output - asserts.assert_in(server_list_attr, read_request[0][descriptor_obj], "ServerList not in output") + asserts.assert_in(Clusters.Objects.Descriptor, read_request[0].keys(), "Descriptor cluster not in output") + asserts.assert_in(Clusters.Objects.Descriptor.Attributes.ServerList, read_request[0][Clusters.Objects.Descriptor], "ServerList not in output") # Step 2 # TH sends the Read Request Message to the DUT to read all attributes on a given cluster and Endpoint @@ -148,18 +148,26 @@ async def test_TC_IDM_2_2(self): self.print_step(2, "Send Request Message to read all attributes on a given cluster and endpoint") - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, descriptor_obj_path) - # Check that endpoint 0 is in output - asserts.assert_in(0, read_request, "Endpoint 0 not in output") - # Check if chip.clusters.Objects.Descriptor is in output - returned_endpoints = read_request[0].keys() - asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") - returned_attributes = list(read_request[0][descriptor_obj].keys()) - if returned_attributes[0] == Clusters.Attribute.DataVersion: - returned_descriptor_attributes = returned_attributes[1:] - expected_descriptor_attributes = ClusterObjects.ALL_ATTRIBUTES[descriptor_obj.id] + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor)]) + asserts.assert_equal({0}, read_request.keys(), "Endpoint 0 not in output") + asserts.assert_equal({Clusters.Objects.Descriptor}, read_request[0].keys(), "Descriptor cluster not in output") + # returned_attributes = list(read_request[0][Clusters.Objects.Descriptor].keys()) + + returned_attributes = [a for a in read_request[0][Clusters.Objects.Descriptor].keys() if a != Clusters.Attribute.DataVersion] + # if returned_attributes[0] == Clusters.Attribute.DataVersion: + # # returned_descriptor_attributes = returned_attributes[1:] + expected_descriptor_attributes = ClusterObjects.ALL_ATTRIBUTES[Clusters.Objects.Descriptor.id] # Actual failure - asserts.assert_equal(set(returned_descriptor_attributes), set(expected_descriptor_attributes.values())) + try: + asserts.assert_equal(set(returned_attributes), set(expected_descriptor_attributes.values())) + except signals.TestFailure as e: + debug = True + if debug: + print(e) + print(vars(e)) + else: + raise + # Step 3 # TH sends the Read Request Message to the DUT to read an attribute from a cluster at all Endpoints @@ -168,15 +176,9 @@ async def test_TC_IDM_2_2(self): # The number of endpoints needs to be read from the device. They're also not always sequential. This should come from the descriptor cluster parts list on EP0 self.print_step(3, "Send Request Message to read one attribute on a given cluster at all endpoints") - all_attributes = await self.default_controller.ReadAttribute(self.dut_node_id, [0, Clusters.Objects.Descriptor]) - - endpoint_list = list(all_attributes) - for endpoint in endpoint_list: - - cluster = list(all_attributes[endpoint].keys())[0] - attribute = list(all_attributes[endpoint][cluster])[1] - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [endpoint, attribute]) - asserts.assert_true(read_request[endpoint][cluster][attribute] is not None, f"Value not present in attribute: {attribute}") + all_attributes = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor]) + for endpoint in all_attributes: + asserts.assert_in(Clusters.Objects.Descriptor, all_attributes[endpoint], f"Descriptor attribute not found in endpoint: {endpoint}") # Step 4 @@ -186,16 +188,26 @@ async def test_TC_IDM_2_2(self): self.print_step(4, "Send Request Message to read one global attribute from all clusters at that endpoint") # endpoint = [0, Clusters.Objects.Descriptor.Attributes.AttributeList] # Is this a global attribute? Trial and error, but looks like it - all_attributes = await self.default_controller.ReadAttribute(self.dut_node_id, [0, Clusters.Objects.Descriptor]) - # TODO: Wait until PR 34833 is ready (python: Add direct attribute paths to Read) before reworking this - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, attribute_list_path) - for i in range(3): - returned_endpoints = read_request[i].keys() - - # Check if chip.clusters.Objects.Descriptor is in output - asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") - # Check if AttributeList is in nested output - asserts.assert_in(attribute_list, read_request[i][descriptor_obj], "AttributeList not in output") + # all_attributes = await self.default_controller.ReadAttribute(self.dut_node_id, [0, Clusters.Objects.Descriptor]) + + attribute_path_1 = AttributePath( + EndpointId = 0, + ClusterId = None, + AttributeId = global_attribute_ids.GlobalAttributeIds.ATTRIBUTE_LIST_ID) + + try: + read_request = await self.default_controller.ReadAttribute( + self.dut_node_id, + [attribute_path_1] + ) + + for i in range(3): + + asserts.assert_in(Clusters.Objects.Descriptor, read_request[i].keys(), "Descriptor cluster not in output") + asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[i][Clusters.Objects.Descriptor], "AttributeList not in output") + except Exception as e: + # Seems to fail even after updating Python wheels - need to investigate, will ignore except to allow tests after this to run + print(e) # Step 5 # TH sends the Read Request Message to the DUT to read all attributes from all clusters on all Endpoints @@ -205,39 +217,30 @@ async def test_TC_IDM_2_2(self): read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [()]) for i in range(3): - returned_endpoints = read_request[i].keys() - # Check if chip.clusters.Objects.Descriptor is in output - asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") - # Check if AttributeList is in nested output - asserts.assert_in(attribute_list, read_request[i][descriptor_obj], "AttributeList not in output") + asserts.assert_in(Clusters.Objects.Descriptor, read_request[i].keys(), "Descriptor cluster not in output") + asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[i][Clusters.Objects.Descriptor], "AttributeList not in output") # Step 6 # TH sends the Read Request Message to the DUT to read a global attribute from all clusters at all Endpoints # AttributePath = [[Attribute = Specific Global Attribute]] # On receipt of this message, DUT should send a report data action with the attribute value from all the clusters to the DUT. self.print_step(6, "Send Request Message to read one global attribute from all clusters on all endpoints") - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [attribute_list]) - returned_endpoints = read_request[0].keys() + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.AttributeList]) + # returned_clusters = read_request[0].keys() for i in range(3): - returned_endpoints = read_request[i].keys() - # Check if chip.clusters.Objects.Descriptor is in output - asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") - # Check if AttributeList is in nested output - asserts.assert_in(attribute_list, read_request[i][descriptor_obj], "AttributeList not in output") + asserts.assert_in(Clusters.Objects.Descriptor, read_request[i].keys(), "Descriptor cluster not in output") + asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[i][Clusters.Objects.Descriptor], "AttributeList not in output") # Step 7 # TH sends the Read Request Message to the DUT to read all attributes from a cluster at all Endpoints # AttributePath = [[Cluster = Specific ClusterID]] # On receipt of this message, DUT should send a report data action with the attribute value from all the Endpoints to the DUT. self.print_step(7, "Send Request Message to read all attributes from one cluster at all endpoints") - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [descriptor_obj]) - returned_endpoints = read_request[0].keys() + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor]) + # returned_clusters = read_request[0].keys() for i in range(3): - returned_endpoints = read_request[i].keys() - # Check if chip.clusters.Objects.Descriptor is in output - asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") - # Check if AttributeList is in nested output - asserts.assert_in(attribute_list, read_request[i][descriptor_obj], "AttributeList not in output") + asserts.assert_in(Clusters.Objects.Descriptor, read_request[i].keys(), "Descriptor cluster not in output") + asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[i][Clusters.Objects.Descriptor], "AttributeList not in output") # Step 8 # TH sends the Read Request Message to the DUT to read all attributes from all clusters at one Endpoint @@ -245,12 +248,14 @@ async def test_TC_IDM_2_2(self): # On receipt of this message, DUT should send a report data action with the attribute value from all the Endpoints to the DUT. self.print_step(8, "Send Request Message to read all attributes from all clusters at one endpoint") read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [0]) - returned_endpoints = read_request[0].keys() - # Check if chip.clusters.Objects.Descriptor is in output - asserts.assert_in(descriptor_obj, returned_endpoints, "Descriptor cluster not in output") - # Check if ServerList is in nested output - asserts.assert_in(server_list_attr, read_request[0][descriptor_obj], "ServerList not in output") + asserts.assert_in(Clusters.Objects.Descriptor, read_request[0].keys(), "Descriptor cluster not in output") + asserts.assert_in(Clusters.Objects.Descriptor.Attributes.ServerList, read_request[0][Clusters.Objects.Descriptor], "ServerList not in output") + asserts.assert_equal( + read_request[0][Clusters.Objects.Descriptor][Clusters.Objects.Descriptor.Attributes.ServerList], + [3, 4, 29, 30, 31, 40, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 60, 62, 63, 64, 65, 1029, 4294048774], + "ServerList doesn't match the expected server list" + ) # Step 9 # TH sends the Read Request Message to the DUT to read an attribute of data type bool. @@ -347,7 +352,7 @@ async def test_TC_IDM_2_2(self): # Verify on the TH that the DUT sends the status code UNSUPPORTED_CLUSTER self.print_step(20, "Send the Read Request Message to the DUT to read any attribute to an unsupported cluster") - + for endpoint_id, endpoint in self.endpoints.items(): for cluster_type, cluster in endpoint.items(): if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: @@ -403,9 +408,9 @@ async def test_TC_IDM_2_2(self): # On the TH verify the received Report data message has the right attribute values for all the 3 times. self.print_step(22, "Send the Read Request Message to the DUT 3 timesand check if they are correct each time") - read_request_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) - read_request_2 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) - read_request_3 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) + read_request_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.ServerList]) + read_request_2 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.ServerList]) + read_request_3 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.ServerList]) asserts.assert_equal(read_request_1, read_request_2) asserts.assert_equal(read_request_2, read_request_3) @@ -415,12 +420,13 @@ async def test_TC_IDM_2_2(self): # TH sends a second read request to the same cluster with the DataVersionFilter Field set with the dataversion value received before. self.print_step( 23, "Send the Read Request Message to the DUT to read a particular attribute with the DataVersionFilter Field not set") - # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly - # read_request = await self.default_controller.ReadAttribute(self.dut_node_id, server_list_attr_path) - # data_version = read_request[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] - # data_version_filter = [(0, Clusters.Descriptor, data_version)] + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.ServerList)]) + data_version = read_request[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] + data_version_filter = [(0, Clusters.Descriptor, data_version)] - # read_request_2 = await self.default_controller.ReadAttribute(self.dut_node_id, server_list_attr_path, dataVersionFilters=data_version_filter) + read_request_2 = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.ServerList)], dataVersionFilters=data_version_filter) + asserts.assert_equal(read_request_2, {}) + # Seems to return {}? # DUT should not send a report data action with the attribute value to the TH if the data version is same as that requested. @@ -433,15 +439,20 @@ async def test_TC_IDM_2_2(self): self.print_step( 24, "Send the Read Request Message to the DUT to read a particular attribute with the DataVersionFilter Field not set") - # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly - # read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)]) - await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap(value=123456))]) - - # data_version = read_request[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] - # data_version_filter = [(0, Clusters.Descriptor, data_version)] - # read_request = await self.default_controller.ReadAttribute( - # self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)], dataVersionFilters=data_version_filter) - # Seems to return {}? + + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.BasicInformation.Attributes.NodeLabel)]) + data_version_1 = read_request[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] + data_version_value = 123456 + await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.Objects.BasicInformation.Attributes.NodeLabel(value=data_version_value))]) + + data_version = read_request[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] + data_version_filter = [(0, Clusters.Objects.BasicInformation, data_version)] + read_request = await self.default_controller.ReadAttribute( + self.dut_node_id, [(0, Clusters.BasicInformation.Attributes.NodeLabel)], dataVersionFilters=data_version_filter) + data_version_2 = read_request[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] + # import pdb;pdb.set_trace() + asserts.assert_equal(int(read_request[0][Clusters.Objects.BasicInformation][Clusters.Objects.BasicInformation.Attributes.NodeLabel]), data_version_value, f"Data version does not equal {data_version_value}") + asserts.assert_equal((data_version_1 + 1), data_version_2, "DataVersion was not incremented") # DUT should send a report data action with the attribute value to the TH. @@ -453,17 +464,23 @@ async def test_TC_IDM_2_2(self): # TH sends a second read request to read all the attributes from the same cluster with the DataVersionFilter Field set with the dataversion value received before. # DUT should send a report data action with all the attribute values to the TH. + # import pdb;pdb.set_trace() self.print_step( 25, "Send the Read Request Message to the DUT to read all attributes on a cluster with the DataVersionFilter Field not set") - # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly - # read_request = await self.default_controller.ReadAttribute(self.dut_node_id, descriptor_obj_path) - await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap(value=654321))]) - # data_version = read_request[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] - # data_version_filter = [(0, Clusters.Descriptor, data_version)] - - # read_request = await self.default_controller.ReadAttribute( - # self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)], dataVersionFilters=data_version_filter) - # Seems to return {}? + + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.BasicInformation)]) + data_version_1 = read_request[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] + data_version_value = 654321 + await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.Objects.BasicInformation.Attributes.NodeLabel(value=data_version_value))]) + data_version = read_request[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] + data_version_filter = [(0, Clusters.BasicInformation, data_version)] + + read_request = await self.default_controller.ReadAttribute( + self.dut_node_id, [(0, Clusters.Objects.BasicInformation)], dataVersionFilters=data_version_filter) + data_version_2 = read_request[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] + asserts.assert_equal(int(read_request[0][Clusters.Objects.BasicInformation][Clusters.Objects.BasicInformation.Attributes.NodeLabel]), data_version_value, f"Data version does not equal {data_version_value}") + asserts.assert_equal((data_version_1 + 1), data_version_2, "DataVersion was not incremented") + # import pdb;pdb.set_trace() # Step 26 # TH sends a Read Request Message to the DUT to read a particular attribute on a particular cluster with the DataVersionFilter Field not set. @@ -474,17 +491,17 @@ async def test_TC_IDM_2_2(self): # DUT should send a report data action with the attribute value to the TH. self.print_step( 26, "Send the Read Request Message to read a particular attribute on a particular cluster with the DataVersionFilter Field not set") - # Temporarily commented to avoid linter errors - # read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) - await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap(value=999))]) - # data_version = read_request[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] - # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly - # data_version_filter = [(0, Clusters.Descriptor, data_version)] - # read_request_2 = await self.default_controller.ReadAttribute( - # self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)], dataVersionFilters=data_version_filter) - # data_version_filter_2 = [(0, Clusters.Descriptor, data_version-1)] - # read_request_3 = await self.default_controller.ReadAttribute( - # self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.FeatureMap)], dataVersionFilters=data_version_filter_2) + data_version_value = 999 + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.BasicInformation.Attributes.DataModelRevision]) + await self.default_controller.WriteAttribute(self.dut_node_id, [(0, Clusters.Objects.BasicInformation.Attributes.NodeLabel(value=data_version_value))]) + data_version = read_request[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] + data_version_filter = [(0, Clusters.BasicInformation, data_version)] + read_request_2 = await self.default_controller.ReadAttribute( + self.dut_node_id, [(0, Clusters.Objects.BasicInformation.Attributes.NodeLabel)], dataVersionFilters=data_version_filter) + data_version_filter_2 = [(0, Clusters.BasicInformation, data_version-1)] + read_request_3 = await self.default_controller.ReadAttribute( + self.dut_node_id, [(0, Clusters.Objects.BasicInformation.Attributes.NodeLabel)], dataVersionFilters=data_version_filter_2) + asserts.assert_equal(int(read_request_3[0][Clusters.Objects.BasicInformation][Clusters.Objects.BasicInformation.Attributes.NodeLabel]), data_version_value, f"Data version does not equal {data_version_value}") # Step 27 @@ -497,15 +514,11 @@ async def test_TC_IDM_2_2(self): # Verify that the DUT does not send the attribute value from cluster A. self.print_step( 27, "Send the Read Request Message to read any supported attribute/wildcard on a particular cluster say A with the DataVersionFilter Field not set") - read_request_27_1_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [server_list_attr]) + read_request_27_1_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.ServerList]) data_version_1 = read_request_27_1_1[0][Clusters.Descriptor][Clusters.Attribute.DataVersion] - - # data_version_filter_1 = [(0, Clusters.Descriptor, data_version_1)] - read_request_27_2_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.BasicInformation.Attributes.NodeLabel)]) data_version_2 = read_request_27_2_1[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] - # data_version_filter_2 = [(0, Clusters.Descriptor, data_version_2)] asserts.assert_not_equal(data_version_1, data_version_2) @@ -516,8 +529,8 @@ async def test_TC_IDM_2_2(self): # Verify on the TH that the DUT sends a chunked data message with the SuppressResponse field set to False for all the messages except the last one. # Verify the last chunked message sent has the SuppressResponse field set to True. - # self.print_step(28, "Send the Read Request Message to read something(Attribute) which is larger than 1 MTU(1280 bytes) and per spec can be chunked +") - # This apparently already exists in TCP tests -- remove? + self.print_step(28, "Send the Read Request Message to read something(Attribute) which is larger than 1 MTU(1280 bytes) and per spec can be chunked +") + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, "*") # Step 29 @@ -526,22 +539,25 @@ async def test_TC_IDM_2_2(self): # On the TH verify that the DUT sends an error message and not the value of the attribute. self.print_step(29, "Send the Read Request Message to the DUT to read a non global attribute from all clusters at that Endpoint") - found_non_global = False - for endpoint_id, endpoint in self.endpoints.items(): - if not found_non_global: - # global_attribute_ids.AttributeIdType.kStandardNonGlobal seems to be non-existent in chip-all-clusters-app - # But kTest does exist -> Clusters.Objects.UnitTesting - for cluster_type, cluster in endpoint.items(): - - if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: - # if global_attribute_ids.cluster_id_type(cluster_type.id) == global_attribute_ids.AttributeIdType.kStandardNonGlobal: - found_non_global = True - - non_global_attr = list(ClusterObjects.ALL_ATTRIBUTES[cluster_type.id].values())[0] - output = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, non_global_attr)]) - asserts.assert_true(isinstance(output[0][cluster_type][non_global_attr].Reason, InteractionModelError), - msg="Unexpected success reading non-global attribute") - continue + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.ServerList]) + + # for endpoint_id, endpoint in self.endpoints.items(): + # if not found_non_global: + # # global_attribute_ids.AttributeIdType.kStandardNonGlobal seems to be non-existent in chip-all-clusters-app + # # But kTest does exist -> Clusters.Objects.UnitTesting + # for cluster_type, cluster in endpoint.items(): + + # if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: + # # if global_attribute_ids.cluster_id_type(cluster_type.id) == global_attribute_ids.AttributeIdType.kStandardNonGlobal: + # found_non_global = True + # import pdb + # pdb.set_trace() + + # non_global_attr = list(ClusterObjects.ALL_ATTRIBUTES[cluster_type.id].values())[0] + # output = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, non_global_attr)]) + # asserts.assert_true(isinstance(output[0][cluster_type][non_global_attr].Reason, InteractionModelError), + # msg="Unexpected success reading non-global attribute") + # continue # Step 30 @@ -550,20 +566,25 @@ async def test_TC_IDM_2_2(self): # On the TH verify that the DUT sends an error message and not the value of the attribute. self.print_step(30, "Send the Read Request Message to the DUT to read a non global attribute from all clusters at all Endpoints") - found_non_global = False - for endpoint_id, endpoint in self.endpoints.items(): - if not found_non_global: - # global_attribute_ids.AttributeIdType.kStandardNonGlobal seems to be non-existent in chip-all-clusters-app - # But kTest does exist -> Clusters.Objects.UnitTesting - for cluster_type, cluster in endpoint.items(): - - if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: - # if global_attribute_ids.cluster_id_type(cluster_type.id) == global_attribute_ids.AttributeIdType.kStandardNonGlobal: - found_non_global = True - continue - # Temporarily commented to avoid linter errors -- will revert once output value is known so that assertion can be used correctly - # non_global_cluster = cluster_type - # read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, non_global_cluster)]) + # Hardcode this for now + # non_global_cluster = Clusters.Objects.RelativeHumidityMeasurement + non_global_cluster = Clusters.Objects.LaundryWasherControls + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, non_global_cluster)]) + asserts.assert_equal(read_request, {}) + # found_non_global = False + # for endpoint_id, endpoint in self.endpoints.items(): + # if not found_non_global: + # # global_attribute_ids.AttributeIdType.kStandardNonGlobal seems to be non-existent in chip-all-clusters-app + # # But kTest does exist -> Clusters.Objects.UnitTesting + # for cluster_type, cluster in endpoint.items(): + # if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: + # # if global_attribute_ids.cluster_id_type(cluster_type.id) == global_attribute_ids.AttributeIdType.kStandardNonGlobal: + # found_non_global = True + # continue + + # non_global_cluster = cluster_type + # read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, non_global_cluster)]) + # asserts.assert_equal(read_request, {}) # Seems to return {}? # Step 31 @@ -574,17 +595,19 @@ async def test_TC_IDM_2_2(self): # Verify that the DUT sends back data of all attributes only from that one cluster to which it has access. # Verify that there are no errors sent back for attributes the TH has no access to. self.print_step(31, "Send the Read Request Message to the DUT to read all attributes from all clusters at Endpoint1") - read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(1, descriptor_obj)]) + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(1, Clusters.Objects.Descriptor)]) asserts.assert_true(1 in read_request, "Endpoint 1 missing in response") - asserts.assert_true(descriptor_obj in read_request[1], "Clusters.Objects.Descriptor not in response") + asserts.assert_true(Clusters.Objects.Descriptor in read_request[1], "Clusters.Objects.Descriptor not in response") # Step 32 # TH sends a Read Request Message to read all events and attributes from the DUT. # Verify that the DUT sends back data of all attributes and events that the TH has access to. - - # Clusters.Descriptor.Events doesn't seem to exist, despite https://project-chip.github.io/connectedhomeip-doc/testing/python.html#events suggesting it should + read_request = await self.default_controller.Read(nodeid=self.dut_node_id, attributes=(), events=()) + asserts.assert_true(hasattr(read_request, 'attributes'), 'attributes not in read_request') + asserts.assert_true(hasattr(read_request, 'events'), 'events not in read_request') + asserts.assert_true(hasattr(read_request, 'tlvAttributes'), 'tlvAttributes not in read_request') if __name__ == "__main__": From b8b9437ecab8475655ad075c299559caa5f1442e Mon Sep 17 00:00:00 2001 From: austina-csa Date: Mon, 16 Sep 2024 09:16:11 -0700 Subject: [PATCH 09/15] Fix errors that snuck in previous commit --- src/python_testing/TC_IDM_2_2.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index 65e9144aee8766..fe0d87f6b14586 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -42,7 +42,7 @@ from chip.clusters.Attribute import AttributePath from chip.interaction_model import InteractionModelError, Status from chip.tlv import uint -from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main, get_accepted_endpoints_for_test +from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main from mobly import asserts, signals # from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from typing import Optional @@ -114,7 +114,7 @@ async def check_attribute_read_for_type(self, desired_attribute_type: type) -> N @async_test_body async def test_TC_IDM_2_2(self): # Test Setup - await self.setup_class_helper(default_to_pase=False) + await self.setup_class_helper(allow_pase=False) all_clusters = [cluster for cluster in Clusters.ClusterObjects.ALL_ATTRIBUTES] @@ -161,7 +161,7 @@ async def test_TC_IDM_2_2(self): try: asserts.assert_equal(set(returned_attributes), set(expected_descriptor_attributes.values())) except signals.TestFailure as e: - debug = True + debug = True # Allow subsequent tests to continue after failure -- for debugging if debug: print(e) print(vars(e)) @@ -374,6 +374,7 @@ async def test_TC_IDM_2_2(self): # Step 21 # TH sends the Read Request Message to the DUT to read an unsupported attribute # DUT responds with the report data action. + # Verify on the TH that the DUT sends the status code UNSUPPORTED_ATTRIBUTE self.print_step(21, "Send the Read Request Message to the DUT to read any attribute to an unsupported attribute") found_unsupported = False for endpoint_id, endpoint in self.endpoints.items(): @@ -400,14 +401,12 @@ async def test_TC_IDM_2_2(self): found_unsupported = True break - # Verify on the TH that the DUT sends the status code UNSUPPORTED_ATTRIBUTE - # Step 22 # TH sends the Read Request Message to the DUT to read an attribute # Repeat the above steps 3 times # On the TH verify the received Report data message has the right attribute values for all the 3 times. - self.print_step(22, "Send the Read Request Message to the DUT 3 timesand check if they are correct each time") + self.print_step(22, "Send the Read Request Message to the DUT 3 times and check if they are correct each time") read_request_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.ServerList]) read_request_2 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.ServerList]) read_request_3 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.ServerList]) From 5f84ec58390bfe1d0ac806781a5c16e23aea7ab3 Mon Sep 17 00:00:00 2001 From: austina-csa Date: Mon, 16 Sep 2024 09:45:11 -0700 Subject: [PATCH 10/15] Fixed potential KeyError --- src/python_testing/TC_IDM_2_2.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index fe0d87f6b14586..b865afcd91f256 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -207,7 +207,8 @@ async def test_TC_IDM_2_2(self): asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[i][Clusters.Objects.Descriptor], "AttributeList not in output") except Exception as e: # Seems to fail even after updating Python wheels - need to investigate, will ignore except to allow tests after this to run - print(e) + # print(e) + raise # Step 5 # TH sends the Read Request Message to the DUT to read all attributes from all clusters on all Endpoints @@ -364,13 +365,15 @@ async def test_TC_IDM_2_2(self): unsupported = [id for id in list(all_clusters - dut_clusters) if global_attribute_ids.attribute_id_type(id) == global_attribute_ids.AttributeIdType.kStandardNonGlobal] - unsupported_attribute = (ClusterObjects.ALL_ATTRIBUTES[unsupported[0]])[0] - if unsupported: + unsupported_attribute = (ClusterObjects.ALL_ATTRIBUTES[unsupported[0]])[0] + result = await self.read_single_attribute_expect_error(endpoint=endpoint_id, cluster=ClusterObjects.ALL_CLUSTERS[unsupported[0]], attribute=unsupported_attribute, error=Status.UnsupportedCluster) asserts.assert_true(isinstance(result.Reason, InteractionModelError), msg="Unexpected success reading invalid cluster") + + # Step 21 # TH sends the Read Request Message to the DUT to read an unsupported attribute # DUT responds with the report data action. From e0459ca847773a6da47b1a823edc1121918894e5 Mon Sep 17 00:00:00 2001 From: austina-csa Date: Mon, 16 Sep 2024 13:57:56 -0700 Subject: [PATCH 11/15] Fixing issue with AttributePath --- src/python_testing/TC_IDM_2_2.py | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index b865afcd91f256..c296d28d60f166 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -47,11 +47,11 @@ # from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from typing import Optional -@dataclass(frozen=True) -class AttributePath: - EndpointId: Optional[int] = None - ClusterId: Optional[int] = None - AttributeId: Optional[int] = None +# @dataclass(frozen=True) +# class AttributePath: +# EndpointId: Optional[int] = None +# ClusterId: Optional[int] = None +# AttributeId: Optional[int] = None @@ -185,11 +185,7 @@ async def test_TC_IDM_2_2(self): # TH sends the Read Request Message to the DUT to read a global attribute from all clusters at that Endpoint # AttributePath = [[Endpoint = Specific Endpoint, Attribute = Specific Global Attribute]] # On receipt of this message, DUT should send a report data action with the attribute value from all the clusters to the DUT. - self.print_step(4, "Send Request Message to read one global attribute from all clusters at that endpoint") - - # endpoint = [0, Clusters.Objects.Descriptor.Attributes.AttributeList] # Is this a global attribute? Trial and error, but looks like it - # all_attributes = await self.default_controller.ReadAttribute(self.dut_node_id, [0, Clusters.Objects.Descriptor]) - + self.print_step(4, "Send Request Message to read one global attribute from all clusters at that endpoint") attribute_path_1 = AttributePath( EndpointId = 0, ClusterId = None, @@ -200,11 +196,8 @@ async def test_TC_IDM_2_2(self): self.dut_node_id, [attribute_path_1] ) - - for i in range(3): - - asserts.assert_in(Clusters.Objects.Descriptor, read_request[i].keys(), "Descriptor cluster not in output") - asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[i][Clusters.Objects.Descriptor], "AttributeList not in output") + asserts.assert_in(Clusters.Objects.Descriptor, read_request[0].keys(), "Descriptor cluster not in output") + asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[0][Clusters.Objects.Descriptor], "AttributeList not in output") except Exception as e: # Seems to fail even after updating Python wheels - need to investigate, will ignore except to allow tests after this to run # print(e) From adb24327114ef8da8cabf2df73426f775d34b93f Mon Sep 17 00:00:00 2001 From: austina-csa Date: Mon, 16 Sep 2024 15:16:57 -0700 Subject: [PATCH 12/15] Removed commented out code --- src/python_testing/TC_IDM_2_2.py | 77 ++++---------------------------- 1 file changed, 8 insertions(+), 69 deletions(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index c296d28d60f166..55d74ff96fbb10 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -44,17 +44,8 @@ from chip.tlv import uint from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main from mobly import asserts, signals -# from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union from typing import Optional -# @dataclass(frozen=True) -# class AttributePath: -# EndpointId: Optional[int] = None -# ClusterId: Optional[int] = None -# AttributeId: Optional[int] = None - - - class TC_IDM_2_2(MatterBaseTest, BasicCompositionTests): async def all_type_attributes_for_cluster(self, cluster: ClusterObjects.Cluster, desired_type: type) -> list[ClusterObjects.ClusterAttributeDescriptor]: @@ -151,22 +142,11 @@ async def test_TC_IDM_2_2(self): read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor)]) asserts.assert_equal({0}, read_request.keys(), "Endpoint 0 not in output") asserts.assert_equal({Clusters.Objects.Descriptor}, read_request[0].keys(), "Descriptor cluster not in output") - # returned_attributes = list(read_request[0][Clusters.Objects.Descriptor].keys()) returned_attributes = [a for a in read_request[0][Clusters.Objects.Descriptor].keys() if a != Clusters.Attribute.DataVersion] - # if returned_attributes[0] == Clusters.Attribute.DataVersion: - # # returned_descriptor_attributes = returned_attributes[1:] expected_descriptor_attributes = ClusterObjects.ALL_ATTRIBUTES[Clusters.Objects.Descriptor.id] - # Actual failure - try: - asserts.assert_equal(set(returned_attributes), set(expected_descriptor_attributes.values())) - except signals.TestFailure as e: - debug = True # Allow subsequent tests to continue after failure -- for debugging - if debug: - print(e) - print(vars(e)) - else: - raise + # Actual failure + asserts.assert_equal(set(returned_attributes), set(expected_descriptor_attributes.values())) # Step 3 @@ -220,7 +200,6 @@ async def test_TC_IDM_2_2(self): # On receipt of this message, DUT should send a report data action with the attribute value from all the clusters to the DUT. self.print_step(6, "Send Request Message to read one global attribute from all clusters on all endpoints") read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.AttributeList]) - # returned_clusters = read_request[0].keys() for i in range(3): asserts.assert_in(Clusters.Objects.Descriptor, read_request[i].keys(), "Descriptor cluster not in output") asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[i][Clusters.Objects.Descriptor], "AttributeList not in output") @@ -231,7 +210,6 @@ async def test_TC_IDM_2_2(self): # On receipt of this message, DUT should send a report data action with the attribute value from all the Endpoints to the DUT. self.print_step(7, "Send Request Message to read all attributes from one cluster at all endpoints") read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor]) - # returned_clusters = read_request[0].keys() for i in range(3): asserts.assert_in(Clusters.Objects.Descriptor, read_request[i].keys(), "Descriptor cluster not in output") asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[i][Clusters.Objects.Descriptor], "AttributeList not in output") @@ -365,8 +343,6 @@ async def test_TC_IDM_2_2(self): asserts.assert_true(isinstance(result.Reason, InteractionModelError), msg="Unexpected success reading invalid cluster") - - # Step 21 # TH sends the Read Request Message to the DUT to read an unsupported attribute # DUT responds with the report data action. @@ -413,6 +389,7 @@ async def test_TC_IDM_2_2(self): # TH sends a Read Request Message to the DUT to read a particular attribute with the DataVersionFilter Field not set. # DUT sends back the attribute value with the DataVersion of the cluster. # TH sends a second read request to the same cluster with the DataVersionFilter Field set with the dataversion value received before. + # DUT should not send a report data action with the attribute value to the TH if the data version is same as that requested. self.print_step( 23, "Send the Read Request Message to the DUT to read a particular attribute with the DataVersionFilter Field not set") read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.ServerList)]) @@ -422,15 +399,12 @@ async def test_TC_IDM_2_2(self): read_request_2 = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor.Attributes.ServerList)], dataVersionFilters=data_version_filter) asserts.assert_equal(read_request_2, {}) - # Seems to return {}? - - # DUT should not send a report data action with the attribute value to the TH if the data version is same as that requested. - # Step 24 # TH sends a Read Request Message to the DUT to read a particular attribute with the DataVersionFilter Field not set. # DUT sends back the attribute value with the DataVersion of the cluster. # TH sends a write request to the same cluster to write to any attribute. # TH sends a second read request to read an attribute from the same cluster with the DataVersionFilter Field set with the dataversion value received before. + # DUT should send a report data action with the attribute value to the TH. self.print_step( 24, "Send the Read Request Message to the DUT to read a particular attribute with the DataVersionFilter Field not set") @@ -445,12 +419,10 @@ async def test_TC_IDM_2_2(self): read_request = await self.default_controller.ReadAttribute( self.dut_node_id, [(0, Clusters.BasicInformation.Attributes.NodeLabel)], dataVersionFilters=data_version_filter) data_version_2 = read_request[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] - # import pdb;pdb.set_trace() + asserts.assert_equal(int(read_request[0][Clusters.Objects.BasicInformation][Clusters.Objects.BasicInformation.Attributes.NodeLabel]), data_version_value, f"Data version does not equal {data_version_value}") asserts.assert_equal((data_version_1 + 1), data_version_2, "DataVersion was not incremented") - # DUT should send a report data action with the attribute value to the TH. - # Step 25 # TH sends a Read Request Message to the DUT to read all attributes on a cluster with the DataVersionFilter Field not set. @@ -459,7 +431,7 @@ async def test_TC_IDM_2_2(self): # TH sends a second read request to read all the attributes from the same cluster with the DataVersionFilter Field set with the dataversion value received before. # DUT should send a report data action with all the attribute values to the TH. - # import pdb;pdb.set_trace() + self.print_step( 25, "Send the Read Request Message to the DUT to read all attributes on a cluster with the DataVersionFilter Field not set") @@ -475,7 +447,6 @@ async def test_TC_IDM_2_2(self): data_version_2 = read_request[0][Clusters.BasicInformation][Clusters.Attribute.DataVersion] asserts.assert_equal(int(read_request[0][Clusters.Objects.BasicInformation][Clusters.Objects.BasicInformation.Attributes.NodeLabel]), data_version_value, f"Data version does not equal {data_version_value}") asserts.assert_equal((data_version_1 + 1), data_version_2, "DataVersion was not incremented") - # import pdb;pdb.set_trace() # Step 26 # TH sends a Read Request Message to the DUT to read a particular attribute on a particular cluster with the DataVersionFilter Field not set. @@ -507,6 +478,7 @@ async def test_TC_IDM_2_2(self): # Verify that the DUT sends a report data action with the attribute value from the cluster B to the TH. # Verify that the DUT does not send the attribute value from cluster A. + self.print_step( 27, "Send the Read Request Message to read any supported attribute/wildcard on a particular cluster say A with the DataVersionFilter Field not set") read_request_27_1_1 = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.ServerList]) @@ -536,24 +508,6 @@ async def test_TC_IDM_2_2(self): self.print_step(29, "Send the Read Request Message to the DUT to read a non global attribute from all clusters at that Endpoint") read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.ServerList]) - # for endpoint_id, endpoint in self.endpoints.items(): - # if not found_non_global: - # # global_attribute_ids.AttributeIdType.kStandardNonGlobal seems to be non-existent in chip-all-clusters-app - # # But kTest does exist -> Clusters.Objects.UnitTesting - # for cluster_type, cluster in endpoint.items(): - - # if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: - # # if global_attribute_ids.cluster_id_type(cluster_type.id) == global_attribute_ids.AttributeIdType.kStandardNonGlobal: - # found_non_global = True - # import pdb - # pdb.set_trace() - - # non_global_attr = list(ClusterObjects.ALL_ATTRIBUTES[cluster_type.id].values())[0] - # output = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, non_global_attr)]) - # asserts.assert_true(isinstance(output[0][cluster_type][non_global_attr].Reason, InteractionModelError), - # msg="Unexpected success reading non-global attribute") - # continue - # Step 30 # TH sends a Read Request Message to the DUT to read a non global attribute from all clusters at all Endpoints @@ -562,25 +516,10 @@ async def test_TC_IDM_2_2(self): # On the TH verify that the DUT sends an error message and not the value of the attribute. self.print_step(30, "Send the Read Request Message to the DUT to read a non global attribute from all clusters at all Endpoints") # Hardcode this for now - # non_global_cluster = Clusters.Objects.RelativeHumidityMeasurement non_global_cluster = Clusters.Objects.LaundryWasherControls read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, non_global_cluster)]) asserts.assert_equal(read_request, {}) - # found_non_global = False - # for endpoint_id, endpoint in self.endpoints.items(): - # if not found_non_global: - # # global_attribute_ids.AttributeIdType.kStandardNonGlobal seems to be non-existent in chip-all-clusters-app - # # But kTest does exist -> Clusters.Objects.UnitTesting - # for cluster_type, cluster in endpoint.items(): - # if global_attribute_ids.cluster_id_type(cluster_type.id) != global_attribute_ids.ClusterIdType.kStandard: - # # if global_attribute_ids.cluster_id_type(cluster_type.id) == global_attribute_ids.AttributeIdType.kStandardNonGlobal: - # found_non_global = True - # continue - - # non_global_cluster = cluster_type - # read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, non_global_cluster)]) - # asserts.assert_equal(read_request, {}) - # Seems to return {}? + # Step 31 # TH should have access to only a single cluster at one Endpoint1. From 83fae4b09ed315c58226a6bcaaa9385131575709 Mon Sep 17 00:00:00 2001 From: austina-csa Date: Wed, 25 Sep 2024 00:04:27 -0700 Subject: [PATCH 13/15] Fixed previously overlooked things --- src/python_testing/TC_IDM_2_2.py | 79 ++++++++++++++++++++------------ 1 file changed, 49 insertions(+), 30 deletions(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index 55d74ff96fbb10..56d84abcb92c22 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -154,7 +154,6 @@ async def test_TC_IDM_2_2(self): # AttributePath = [[Cluster = Specific ClusterID, Attribute = Specific Attribute]] # On receipt of this message, DUT should send a report data action with the attribute value from all the Endpoints to the DUT. - # The number of endpoints needs to be read from the device. They're also not always sequential. This should come from the descriptor cluster parts list on EP0 self.print_step(3, "Send Request Message to read one attribute on a given cluster at all endpoints") all_attributes = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor]) for endpoint in all_attributes: @@ -171,17 +170,13 @@ async def test_TC_IDM_2_2(self): ClusterId = None, AttributeId = global_attribute_ids.GlobalAttributeIds.ATTRIBUTE_LIST_ID) - try: - read_request = await self.default_controller.ReadAttribute( - self.dut_node_id, - [attribute_path_1] - ) - asserts.assert_in(Clusters.Objects.Descriptor, read_request[0].keys(), "Descriptor cluster not in output") - asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[0][Clusters.Objects.Descriptor], "AttributeList not in output") - except Exception as e: - # Seems to fail even after updating Python wheels - need to investigate, will ignore except to allow tests after this to run - # print(e) - raise + read_request = await self.default_controller.ReadAttribute( + self.dut_node_id, + [attribute_path_1] + ) + for endpoint in read_request: + asserts.assert_in(Clusters.Objects.Descriptor, read_request[endpoint].keys(), "Descriptor cluster not in output") + asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[endpoint][Clusters.Objects.Descriptor], "AttributeList not in output") # Step 5 # TH sends the Read Request Message to the DUT to read all attributes from all clusters on all Endpoints @@ -189,10 +184,21 @@ async def test_TC_IDM_2_2(self): # On receipt of this message, DUT should send a report data action with the attribute value from all the clusters to the DUT. self.print_step(5, "Send Request Message to read all attributes from all clusters on all endpoints") read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [()]) - - for i in range(3): - asserts.assert_in(Clusters.Objects.Descriptor, read_request[i].keys(), "Descriptor cluster not in output") - asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[i][Clusters.Objects.Descriptor], "AttributeList not in output") + # NOTE: This is checked in its entirety in IDM-10.1 + asserts.assert_equal(sorted(read_request), [0, 1, 2, 3, 4], "Endpoint list is not the expected value") + all_returned_clusters = [] + for endpoint in read_request: + all_returned_clusters.extend(read_request[endpoint]) + asserts.assert_equal(set(self.device_clusters), set(all_returned_clusters), "Mismatch of expected returned clusters") + + attr_count = 0 + for endpoint in read_request: + cluster_list = iter(read_request[endpoint]) + chosen_cluster = next(cluster_list) + attr_count = len(read_request[endpoint][chosen_cluster].keys()) + if attr_count > 1: + break + asserts.assert_true((attr_count > 1), "No cluster in output has more than one attribute") # Step 6 # TH sends the Read Request Message to the DUT to read a global attribute from all clusters at all Endpoints @@ -200,19 +206,30 @@ async def test_TC_IDM_2_2(self): # On receipt of this message, DUT should send a report data action with the attribute value from all the clusters to the DUT. self.print_step(6, "Send Request Message to read one global attribute from all clusters on all endpoints") read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor.Attributes.AttributeList]) - for i in range(3): - asserts.assert_in(Clusters.Objects.Descriptor, read_request[i].keys(), "Descriptor cluster not in output") - asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[i][Clusters.Objects.Descriptor], "AttributeList not in output") + attribute_path_2 = AttributePath( + EndpointId = None, + ClusterId = None, + AttributeId = global_attribute_ids.GlobalAttributeIds.ATTRIBUTE_LIST_ID) + read_request = await self.default_controller.ReadAttribute( + self.dut_node_id, + [attribute_path_2] + ) + + for endpoint in read_request: + asserts.assert_in(Clusters.Objects.Descriptor, read_request[endpoint].keys(), "Descriptor cluster not in output") + asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[endpoint][Clusters.Objects.Descriptor], "AttributeList not in output") + # Step 7 # TH sends the Read Request Message to the DUT to read all attributes from a cluster at all Endpoints # AttributePath = [[Cluster = Specific ClusterID]] # On receipt of this message, DUT should send a report data action with the attribute value from all the Endpoints to the DUT. self.print_step(7, "Send Request Message to read all attributes from one cluster at all endpoints") read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [Clusters.Objects.Descriptor]) - for i in range(3): - asserts.assert_in(Clusters.Objects.Descriptor, read_request[i].keys(), "Descriptor cluster not in output") - asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[i][Clusters.Objects.Descriptor], "AttributeList not in output") + + for endpoint in read_request: + asserts.assert_in(Clusters.Objects.Descriptor, read_request[endpoint].keys(), "Descriptor cluster not in output") + asserts.assert_in(Clusters.Objects.Descriptor.Attributes.AttributeList, read_request[endpoint][Clusters.Objects.Descriptor], "AttributeList not in output") # Step 8 # TH sends the Read Request Message to the DUT to read all attributes from all clusters at one Endpoint @@ -221,14 +238,16 @@ async def test_TC_IDM_2_2(self): self.print_step(8, "Send Request Message to read all attributes from all clusters at one endpoint") read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [0]) - asserts.assert_in(Clusters.Objects.Descriptor, read_request[0].keys(), "Descriptor cluster not in output") - asserts.assert_in(Clusters.Objects.Descriptor.Attributes.ServerList, read_request[0][Clusters.Objects.Descriptor], "ServerList not in output") - asserts.assert_equal( - read_request[0][Clusters.Objects.Descriptor][Clusters.Objects.Descriptor.Attributes.ServerList], - [3, 4, 29, 30, 31, 40, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 60, 62, 63, 64, 65, 1029, 4294048774], - "ServerList doesn't match the expected server list" - ) - + for endpoint in read_request: + asserts.assert_in(Clusters.Objects.Descriptor, read_request[endpoint].keys(), "Descriptor cluster not in output") + asserts.assert_in(Clusters.Objects.Descriptor.Attributes.ServerList, read_request[endpoint][Clusters.Objects.Descriptor], "ServerList not in output") + asserts.assert_equal( + read_request[endpoint][Clusters.Objects.Descriptor][Clusters.Objects.Descriptor.Attributes.ServerList], + [3, 4, 29, 30, 31, 40, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 60, 62, 63, 64, 65, 1029, 4294048774], + "ServerList doesn't match the expected server list" + ) + import pdb + pdb.set_trace() # Step 9 # TH sends the Read Request Message to the DUT to read an attribute of data type bool. # If the device does not have an attribute of data type bool, skip this step. From bb1f34bc37b985bebbe0c2710d8b9707da5afae8 Mon Sep 17 00:00:00 2001 From: austina-csa Date: Thu, 26 Sep 2024 09:09:13 -0700 Subject: [PATCH 14/15] Fixed linting issues --- src/python_testing/TC_IDM_2_2.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index 56d84abcb92c22..10114c3752d313 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -34,7 +34,7 @@ import chip.clusters as Clusters import global_attribute_ids -from dataclasses import dataclass, field + from basic_composition_support import BasicCompositionTests from chip.clusters import ClusterObjects as ClusterObjects from chip.clusters.ClusterObjects import ClusterObject @@ -43,8 +43,8 @@ from chip.interaction_model import InteractionModelError, Status from chip.tlv import uint from matter_testing_support import MatterBaseTest, async_test_body, default_matter_test_main -from mobly import asserts, signals -from typing import Optional +from mobly import asserts + class TC_IDM_2_2(MatterBaseTest, BasicCompositionTests): @@ -246,8 +246,7 @@ async def test_TC_IDM_2_2(self): [3, 4, 29, 30, 31, 40, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 60, 62, 63, 64, 65, 1029, 4294048774], "ServerList doesn't match the expected server list" ) - import pdb - pdb.set_trace() + # Step 9 # TH sends the Read Request Message to the DUT to read an attribute of data type bool. # If the device does not have an attribute of data type bool, skip this step. From 143d0bdf4befe7d8f65bf849d82458af26e598e2 Mon Sep 17 00:00:00 2001 From: austina-csa Date: Tue, 1 Oct 2024 22:19:03 -0700 Subject: [PATCH 15/15] 1) Addressed issue assuming that all clusters would be present on a device; 2) Addressed issue assuming all endpoints would be present on a device --- src/python_testing/TC_IDM_2_2.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/python_testing/TC_IDM_2_2.py b/src/python_testing/TC_IDM_2_2.py index 10114c3752d313..c339e3bcf282f6 100644 --- a/src/python_testing/TC_IDM_2_2.py +++ b/src/python_testing/TC_IDM_2_2.py @@ -79,11 +79,20 @@ def all_device_clusters(self) -> set: device_clusters |= set(self.endpoints[endpoint].keys()) return device_clusters + def all_device_attributes(self) -> set: + device_attributes = set() + for endpoint in self.endpoints: + for cluster in self.endpoints[endpoint]: + # device_attributes |= set(self.endpoints[endpoint][cluster]) + device_attributes |= self.endpoints[endpoint][cluster].keys() + return device_attributes + async def check_attribute_read_for_type(self, desired_attribute_type: type) -> None: # Get all clusters from device - + # What you want there is the intersection of the set of attributes returned from all_type_attributes_for_cluster and the set of attributes implemented on the device. for cluster in self.device_clusters: all_types = await self.all_type_attributes_for_cluster(cluster, desired_attribute_type) + all_types = list(set(all_types) & self.device_attributes) if all_types: chosen_attribute = all_types[0] cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_attribute.cluster_id] @@ -110,6 +119,7 @@ async def test_TC_IDM_2_2(self): all_clusters = [cluster for cluster in Clusters.ClusterObjects.ALL_ATTRIBUTES] self.device_clusters = self.all_device_clusters() + self.device_attributes = self.all_device_attributes() self.all_supported_clusters = [cluster for cluster in Clusters.__dict__.values( ) if inspect.isclass(cluster) and issubclass(cluster, ClusterObjects.Cluster)] @@ -185,7 +195,7 @@ async def test_TC_IDM_2_2(self): self.print_step(5, "Send Request Message to read all attributes from all clusters on all endpoints") read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [()]) # NOTE: This is checked in its entirety in IDM-10.1 - asserts.assert_equal(sorted(read_request), [0, 1, 2, 3, 4], "Endpoint list is not the expected value") + asserts.assert_equal(read_request.keys(), self.endpoints.keys(), "Endpoint list is not the expected value") all_returned_clusters = [] for endpoint in read_request: all_returned_clusters.extend(read_request[endpoint])