From d4f9756f5f43aa23ee25ca114448e961172d1de0 Mon Sep 17 00:00:00 2001 From: Bashir Sadjad Date: Fri, 14 Jul 2023 01:47:51 -0400 Subject: [PATCH 1/3] Fixes missing QuestionnaireResponse `answer` fields (#759) --- .../converters/DefinitionToAvroVisitor.java | 2 + .../r4/R4StructureDefinitions.java | 24 ++++------ .../stu3/Stu3StructureDefinitions.java | 24 ++++------ .../definitions/DefinitionVisitorsUtil.java | 46 ++++++++++++++++++- bunsen/pom.xml | 2 +- cloudbuild.yaml | 20 ++++---- docker/config/flink-conf.yaml | 7 ++- utils/flink-conf.yaml | 25 ++++++++-- 8 files changed, 102 insertions(+), 48 deletions(-) diff --git a/bunsen/bunsen-avro/src/main/java/com/cerner/bunsen/avro/converters/DefinitionToAvroVisitor.java b/bunsen/bunsen-avro/src/main/java/com/cerner/bunsen/avro/converters/DefinitionToAvroVisitor.java index 0e199692e..bf6ac7982 100644 --- a/bunsen/bunsen-avro/src/main/java/com/cerner/bunsen/avro/converters/DefinitionToAvroVisitor.java +++ b/bunsen/bunsen-avro/src/main/java/com/cerner/bunsen/avro/converters/DefinitionToAvroVisitor.java @@ -743,6 +743,8 @@ private static String lowercase(String string) { @Override public int getMaxDepth(String elementTypeUrl, String path) { + // should be an odd number! + // return 3; return 1; } } diff --git a/bunsen/bunsen-core-r4/src/main/java/com/cerner/bunsen/definitions/r4/R4StructureDefinitions.java b/bunsen/bunsen-core-r4/src/main/java/com/cerner/bunsen/definitions/r4/R4StructureDefinitions.java index 2de9ba529..e8c477b61 100644 --- a/bunsen/bunsen-core-r4/src/main/java/com/cerner/bunsen/definitions/r4/R4StructureDefinitions.java +++ b/bunsen/bunsen-core-r4/src/main/java/com/cerner/bunsen/definitions/r4/R4StructureDefinitions.java @@ -3,6 +3,7 @@ import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.support.IValidationSupport; import com.cerner.bunsen.definitions.DefinitionVisitor; +import com.cerner.bunsen.definitions.DefinitionVisitorsUtil; import com.cerner.bunsen.definitions.FhirConversionSupport; import com.cerner.bunsen.definitions.QualifiedPath; import com.cerner.bunsen.definitions.StructureDefinitions; @@ -70,15 +71,6 @@ private List getChildren(ElementDefinition parent, .collect(Collectors.toList()); } - private String elementName(ElementDefinition element) { - - String suffix = element.getPath().substring(element.getPath().lastIndexOf('.') + 1); - - return suffix.endsWith("[x]") - ? suffix.substring(0, suffix.length() - 3) - : suffix; - } - private StructureDefinition getDefinition(ElementDefinition element) { return element.getTypeFirstRep() == null @@ -292,7 +284,7 @@ private List> elementToFields(DefinitionVisitor visitor List definitions, Deque stack) { - String elementName = elementName(element); + String elementName = DefinitionVisitorsUtil.elementName(element.getPath()); if (shouldTerminateRecursive(visitor, rootDefinition, element, stack)) { @@ -386,7 +378,7 @@ private List> elementToFields(DefinitionVisitor visitor rootDefinition, definitions, stack, element); T result = visitor.visitComposite(elementName, - element.getPath(), + DefinitionVisitorsUtil.pathFromStack(elementName, stack), elementName, rootDefinition.getUrl(), childElements); @@ -417,7 +409,7 @@ private List> elementToFields(DefinitionVisitor visitor } else { T type = transform(visitor, element, definition, stack); - return singleField(elementName(element), type); + return singleField(DefinitionVisitorsUtil.elementName(element.getPath()), type); } } else { @@ -493,7 +485,7 @@ private StructureField transformContained(DefinitionVisitor visitor, stack.pop(); - String rootName = elementName(containedRootElement); + String rootName = DefinitionVisitorsUtil.elementName(containedRootElement.getPath()); T result = visitor.visitComposite(rootName, containedRootElement.getPath(), @@ -603,7 +595,7 @@ private T transform(DefinitionVisitor visitor, if ("Reference".equals(definition.getType())) { // TODO: if this is in an option there may be other non-reference types here? - String rootName = elementName(root); + String rootName = DefinitionVisitorsUtil.elementName(root.getPath()); List referenceTypes = parentElement.getType() .stream() @@ -630,7 +622,7 @@ private T transform(DefinitionVisitor visitor, } else { - String rootName = elementName(root); + String rootName = DefinitionVisitorsUtil.elementName(root.getPath()); // We don't want 'id' to be present in nested fields to make it consistent with SQL-on-FHIR. // https://github.com/FHIR/sql-on-fhir/blob/master/sql-on-fhir.md#id-fields-omitted @@ -679,7 +671,7 @@ private T transformRoot(DefinitionVisitor visitor, stack.pop(); - String rootName = elementName(root); + String rootName = DefinitionVisitorsUtil.elementName(root.getPath()); return visitor.visitComposite(rootName, rootName, diff --git a/bunsen/bunsen-core-stu3/src/main/java/com/cerner/bunsen/definitions/stu3/Stu3StructureDefinitions.java b/bunsen/bunsen-core-stu3/src/main/java/com/cerner/bunsen/definitions/stu3/Stu3StructureDefinitions.java index b1efcfd17..1a3436e10 100644 --- a/bunsen/bunsen-core-stu3/src/main/java/com/cerner/bunsen/definitions/stu3/Stu3StructureDefinitions.java +++ b/bunsen/bunsen-core-stu3/src/main/java/com/cerner/bunsen/definitions/stu3/Stu3StructureDefinitions.java @@ -3,6 +3,7 @@ import ca.uhn.fhir.context.FhirContext; import ca.uhn.fhir.context.support.IValidationSupport; import com.cerner.bunsen.definitions.DefinitionVisitor; +import com.cerner.bunsen.definitions.DefinitionVisitorsUtil; import com.cerner.bunsen.definitions.FhirConversionSupport; import com.cerner.bunsen.definitions.QualifiedPath; import com.cerner.bunsen.definitions.StructureDefinitions; @@ -60,15 +61,6 @@ private List getChildren(ElementDefinition parent, .collect(Collectors.toList()); } - private String elementName(ElementDefinition element) { - - String suffix = element.getPath().substring(element.getPath().lastIndexOf('.') + 1); - - return suffix.endsWith("[x]") - ? suffix.substring(0, suffix.length() - 3) - : suffix; - } - private StructureDefinition getDefinition(ElementDefinition element) { return element.getTypeFirstRep() == null @@ -263,7 +255,7 @@ private List> elementToFields(DefinitionVisitor visitor List definitions, Deque stack) { - String elementName = elementName(element); + String elementName = DefinitionVisitorsUtil.elementName(element.getPath()); if (shouldTerminateRecursive(visitor, rootDefinition, element, stack)) { @@ -382,7 +374,7 @@ private List> elementToFields(DefinitionVisitor visitor } else { T type = transform(visitor, element, definition, stack); - return singleField(elementName(element), type); + return singleField(DefinitionVisitorsUtil.elementName(element.getPath()), type); } } else { @@ -392,7 +384,7 @@ private List> elementToFields(DefinitionVisitor visitor definitions, stack, element); T result = visitor.visitComposite(elementName, - element.getPath(), + DefinitionVisitorsUtil.pathFromStack(elementName, stack), elementName, rootDefinition.getUrl(), childElements); @@ -458,7 +450,7 @@ private StructureField transformContained(DefinitionVisitor visitor, stack.pop(); - String rootName = elementName(containedRootElement); + String rootName = DefinitionVisitorsUtil.elementName(containedRootElement.getPath()); T result = visitor.visitComposite(rootName, containedRootElement.getPath(), @@ -568,7 +560,7 @@ private T transform(DefinitionVisitor visitor, if ("Reference".equals(definition.getType())) { // TODO: if this is in an option there may be other non-reference types here? - String rootName = elementName(root); + String rootName = DefinitionVisitorsUtil.elementName(root.getPath()); List referenceTypes = parentElement.getType() .stream() @@ -590,7 +582,7 @@ private T transform(DefinitionVisitor visitor, } else { - String rootName = elementName(root); + String rootName = DefinitionVisitorsUtil.elementName(root.getPath()); // We don't want 'id' to be present in nested fields to make it consistent with SQL-on-FHIR. // https://github.com/FHIR/sql-on-fhir/blob/master/sql-on-fhir.md#id-fields-omitted @@ -638,7 +630,7 @@ private T transformRoot(DefinitionVisitor visitor, stack.pop(); - String rootName = elementName(root); + String rootName = DefinitionVisitorsUtil.elementName(root.getPath()); return visitor.visitComposite(rootName, rootName, diff --git a/bunsen/bunsen-core/src/main/java/com/cerner/bunsen/definitions/DefinitionVisitorsUtil.java b/bunsen/bunsen-core/src/main/java/com/cerner/bunsen/definitions/DefinitionVisitorsUtil.java index aacf04a92..eaed98f7c 100644 --- a/bunsen/bunsen-core/src/main/java/com/cerner/bunsen/definitions/DefinitionVisitorsUtil.java +++ b/bunsen/bunsen-core/src/main/java/com/cerner/bunsen/definitions/DefinitionVisitorsUtil.java @@ -1,8 +1,13 @@ package com.cerner.bunsen.definitions; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; /** @@ -14,7 +19,6 @@ public class DefinitionVisitorsUtil { private static final Pattern STRUCTURE_URL_PATTERN = Pattern.compile("http:\\/\\/hl7.org\\/fhir(\\/.*)?\\/StructureDefinition\\/([^\\/]*)$"); - /** * Helper method to convert a given element path that's delimited by period to a concatenated * string in title case. @@ -61,4 +65,44 @@ public static String namespaceFor(String basePackage, String structureDefinition } } + /** + * This is to extract the full path of an element based on the element traversal stack. + * The reason for using the full path from stack is to differentiate between same types at + * different recursion levels, e.g., when a child is dropped because of recursion max-depth + * while in a different traversal (for the same type) that child is present. Note in both those + * cases the element name and path would be the same hence we need to rely on the full traversal + * stack. As an example consider these two fields: + * QuestionnaireResponse.item.answer.item + * QuestionnaireResponse.item.answer.item.answer.item + * Both of these are `item` with "element path" being `QuestionnaireResponse.item` but the + * `answer` field might have been dropped in the second one because of recursion limits. + * + * @param elemName the name of the target element + * @param stack the current traversal stack leading to `elemName` + * @return the full path for `elemName` created using element names on the stack + */ + public static String pathFromStack(String elemName, Deque stack) { + // TODO add unit-tests for this method. + List fullPath = new ArrayList<>(); + Iterator iter = stack.descendingIterator(); + while (iter.hasNext()) { + String path = iter.next().getElementPath(); + fullPath.add(elementName(path)); + } + fullPath.add(elemName); + return fullPath.stream().collect(Collectors.joining(".")); + } + + /** + * Creates a canonical element name from the element path. + */ + public static String elementName(String elementPath) { + + String suffix = elementPath.substring(elementPath.lastIndexOf('.') + 1); + + return suffix.endsWith("[x]") + ? suffix.substring(0, suffix.length() - 3) + : suffix; + } + } diff --git a/bunsen/pom.xml b/bunsen/pom.xml index 51aedea41..8a2bb420b 100644 --- a/bunsen/pom.xml +++ b/bunsen/pom.xml @@ -30,7 +30,7 @@ 3.5.3 3.5.0 3.4.3 - 1.7.36 + 2.0.7 1.4.7 diff --git a/cloudbuild.yaml b/cloudbuild.yaml index c56cb0431..de7d64a30 100644 --- a/cloudbuild.yaml +++ b/cloudbuild.yaml @@ -34,16 +34,16 @@ steps: - mvn --no-transfer-progress -e install -Dlicense.skip=true -Dspotless.apply.skip=true waitFor: ['-'] -- name: 'gcr.io/cloud-builders/curl' - id: 'Upload coverage reports to codecov.io' - entrypoint: bash - args: ['-c', 'bash <(curl -s https://codecov.io/bash)'] - env: - - 'VCS_COMMIT_ID=$COMMIT_SHA' - - 'VCS_BRANCH_NAME=$BRANCH_NAME' - - 'VCS_PULL_REQUEST=$_PR_NUMBER' - - 'CI_BUILD_ID=$BUILD_ID' - - 'CODECOV_TOKEN=$_CODECOV_TOKEN' # This token is generated when you setup your repo on codecov: https://docs.codecov.com/docs +# - name: 'gcr.io/cloud-builders/curl' +# id: 'Upload coverage reports to codecov.io' +# entrypoint: bash +# args: ['-c', 'bash <(curl -s https://codecov.io/bash)'] +# env: +# - 'VCS_COMMIT_ID=$COMMIT_SHA' +# - 'VCS_BRANCH_NAME=$BRANCH_NAME' +# - 'VCS_PULL_REQUEST=$_PR_NUMBER' +# - 'CI_BUILD_ID=$BUILD_ID' +# - 'CODECOV_TOKEN=$_CODECOV_TOKEN' # This token is generated when you setup your repo on codecov: https://docs.codecov.com/docs - name: 'gcr.io/cloud-builders/docker' id: 'Build Uploader Image' diff --git a/docker/config/flink-conf.yaml b/docker/config/flink-conf.yaml index fe2d0bfdb..6778cc48c 100644 --- a/docker/config/flink-conf.yaml +++ b/docker/config/flink-conf.yaml @@ -18,9 +18,14 @@ # exceptions when running the merger on large input with many workers. taskmanager.memory.network.max: 5gb +# This is needed to be able to process large resources, otherwise in JDBC +# mode we may get the following exception: +# "The record exceeds the maximum size of a sort buffer ..." +taskmanager.memory.managed.size: 2gb + # This is to make pipeline.run() non-blocking with FlinkRunner; unfortunately # this is overwritten in `local` mode: https://stackoverflow.com/a/74416240 execution.attached: false # This is required to track the pipeline metrics when FlinkRunner is used. -execution.job-listeners: org.openmrs.analytics.metrics.FlinkJobListener \ No newline at end of file +execution.job-listeners: org.openmrs.analytics.metrics.FlinkJobListener diff --git a/utils/flink-conf.yaml b/utils/flink-conf.yaml index f286d0fc0..ce03570a0 100644 --- a/utils/flink-conf.yaml +++ b/utils/flink-conf.yaml @@ -1,8 +1,27 @@ -# To use this config, FLINK_CONF_DIR env. var should be set. +# Copyright 2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. -# This is needed to prevent a "Insufficient number of network buffers" +# To use this config, FLINK_CONF_DIR env. var should be set to the parent dir. + +# This is needed to prevent an "Insufficient number of network buffers" # exceptions when running the merger on large input with many workers. -taskmanager.network.memory.max: 5gb +taskmanager.memory.network.max: 5gb + +# This is needed to be able to process large resources, otherwise in JDBC +# mode we may get the following exception: +# "The record exceeds the maximum size of a sort buffer ..." +taskmanager.memory.managed.size: 2gb # This is to make pipeline.run() non-blocking with FlinkRunner; unfortunately # this is overwritten in `local` mode: https://stackoverflow.com/a/74416240 From ea2de8d4faaef36903a8553e86bf72bf2dd380ea Mon Sep 17 00:00:00 2001 From: suriyan3 <120384522+suriyan3@users.noreply.github.com> Date: Tue, 18 Jul 2023 22:39:14 +0530 Subject: [PATCH 2/3] Handling large resource parsing exception using DataFormatException (#756) --- .../org/openmrs/analytics/ConvertResourceFn.java | 12 +++++++++++- .../java/org/openmrs/analytics/MetricsConstants.java | 1 + 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/pipelines/batch/src/main/java/org/openmrs/analytics/ConvertResourceFn.java b/pipelines/batch/src/main/java/org/openmrs/analytics/ConvertResourceFn.java index 3fe64cc34..b7f160dbe 100644 --- a/pipelines/batch/src/main/java/org/openmrs/analytics/ConvertResourceFn.java +++ b/pipelines/batch/src/main/java/org/openmrs/analytics/ConvertResourceFn.java @@ -16,6 +16,7 @@ package org.openmrs.analytics; import ca.uhn.fhir.context.FhirVersionEnum; +import ca.uhn.fhir.parser.DataFormatException; import com.google.common.base.Strings; import java.io.IOException; import java.lang.reflect.InvocationTargetException; @@ -54,6 +55,8 @@ public class ConvertResourceFn extends FetchSearchPageFn { private final Boolean processDeletedRecords; + Counter counter = Metrics.counter(MetricsConstants.METRICS_NAMESPACE, MetricsConstants.DATA_FORMAT_EXCEPTION_ERROR); + ConvertResourceFn(FhirEtlOptions options, String stageIdentifier) { super(options, stageIdentifier); this.numFetchedResourcesMap = new HashMap(); @@ -116,7 +119,14 @@ public void writeResource(HapiRowDescriptor element) meta.addTag( new Coding(removeAction.getSystem(), removeAction.toCode(), removeAction.getDisplay())); } else { - resource = (Resource) parser.parseResource(jsonResource); + try { + resource = (Resource) parser.parseResource(jsonResource); + } + catch (DataFormatException e) { + log.error("DataFormatException Error occurred", e); + counter.inc(); + return; + } } totalParseTimeMillisMap.get(resourceType).inc(System.currentTimeMillis() - startTime); resource.setId(resourceId); diff --git a/pipelines/batch/src/main/java/org/openmrs/analytics/MetricsConstants.java b/pipelines/batch/src/main/java/org/openmrs/analytics/MetricsConstants.java index b5bb6d02d..f5b189159 100644 --- a/pipelines/batch/src/main/java/org/openmrs/analytics/MetricsConstants.java +++ b/pipelines/batch/src/main/java/org/openmrs/analytics/MetricsConstants.java @@ -21,6 +21,7 @@ public class MetricsConstants { public static final String NUM_FETCHED_RESOURCES = "numFetchedResources_"; public static final String NUM_OUTPUT_RECORDS = "numOutputRecords"; public static final String NUM_DUPLICATES = "numDuplicates"; + public static final String DATA_FORMAT_EXCEPTION_ERROR = "numDataFormatExceptionErrors_"; public static final String TOTAL_FETCH_TIME_MILLIS = "totalFetchTimeMillis_"; public static final String TOTAL_GENERATE_TIME_MILLIS = "totalGenerateTimeMillis_"; public static final String TOTAL_PUSH_TIME_MILLIS = "totalPushTimeMillis_"; From 892ad88ab6c69993370ab325f2923e55cd92bf39 Mon Sep 17 00:00:00 2001 From: Bashir Sadjad Date: Fri, 21 Jul 2023 12:35:22 -0400 Subject: [PATCH 3/3] Fixed bugs in recursion depth and added a path example (#761) --- .../converters/DefinitionToAvroVisitor.java | 4 +- .../bunsen/avro/R4AvroConverterTest.java | 2 +- .../bunsen/avro/Stu3AvroConverterTest.java | 2 +- .../r4/R4StructureDefinitions.java | 171 +++++++----------- .../stu3/Stu3StructureDefinitions.java | 166 +++++++---------- .../definitions/HapiCompositeConverter.java | 4 +- .../openmrs/analytics/ConvertResourceFn.java | 7 +- .../openmrs/analytics/JdbcFetchHapiTest.java | 4 +- 8 files changed, 144 insertions(+), 216 deletions(-) diff --git a/bunsen/bunsen-avro/src/main/java/com/cerner/bunsen/avro/converters/DefinitionToAvroVisitor.java b/bunsen/bunsen-avro/src/main/java/com/cerner/bunsen/avro/converters/DefinitionToAvroVisitor.java index bf6ac7982..4a6546f4f 100644 --- a/bunsen/bunsen-avro/src/main/java/com/cerner/bunsen/avro/converters/DefinitionToAvroVisitor.java +++ b/bunsen/bunsen-avro/src/main/java/com/cerner/bunsen/avro/converters/DefinitionToAvroVisitor.java @@ -452,6 +452,7 @@ public HapiConverter visitComposite(String elementName, String elementTypeUrl, List>> children) { + Preconditions.checkArgument(!children.isEmpty()); String recordName = DefinitionVisitorsUtil.recordNameFor(elementPath); String recordNamespace = DefinitionVisitorsUtil.namespaceFor(basePackage, elementTypeUrl); String fullName = recordNamespace + "." + recordName; @@ -743,8 +744,7 @@ private static String lowercase(String string) { @Override public int getMaxDepth(String elementTypeUrl, String path) { - // should be an odd number! - // return 3; + // return 2; return 1; } } diff --git a/bunsen/bunsen-avro/src/test/java/com/cerner/bunsen/avro/R4AvroConverterTest.java b/bunsen/bunsen-avro/src/test/java/com/cerner/bunsen/avro/R4AvroConverterTest.java index 6278e93bd..7c5590962 100644 --- a/bunsen/bunsen-avro/src/test/java/com/cerner/bunsen/avro/R4AvroConverterTest.java +++ b/bunsen/bunsen-avro/src/test/java/com/cerner/bunsen/avro/R4AvroConverterTest.java @@ -488,7 +488,7 @@ public void testCompile() throws IOException { // Ensure common types were generated Assert.assertTrue(javaFiles.contains("com/cerner/bunsen/r4/avro/Period.java")); - Assert.assertTrue(javaFiles.contains("com/cerner/bunsen/r4/avro/Coding.java")); + Assert.assertTrue(javaFiles.contains("com/cerner/bunsen/r4/avro/PatientCoding.java")); Assert.assertTrue(javaFiles.contains("com/cerner/bunsen/r4/avro/ValueSet.java")); // The specific profile should be created in the expecter4b-package. diff --git a/bunsen/bunsen-avro/src/test/java/com/cerner/bunsen/avro/Stu3AvroConverterTest.java b/bunsen/bunsen-avro/src/test/java/com/cerner/bunsen/avro/Stu3AvroConverterTest.java index e9b69a6e1..fa5894c5c 100644 --- a/bunsen/bunsen-avro/src/test/java/com/cerner/bunsen/avro/Stu3AvroConverterTest.java +++ b/bunsen/bunsen-avro/src/test/java/com/cerner/bunsen/avro/Stu3AvroConverterTest.java @@ -485,7 +485,7 @@ public void testCompile() throws IOException { // Ensure common types were generated Assert.assertTrue(javaFiles.contains("com/cerner/bunsen/stu3/avro/Period.java")); - Assert.assertTrue(javaFiles.contains("com/cerner/bunsen/stu3/avro/Coding.java")); + Assert.assertTrue(javaFiles.contains("com/cerner/bunsen/stu3/avro/PatientCoding.java")); Assert.assertTrue(javaFiles.contains("com/cerner/bunsen/stu3/avro/ValueSet.java")); // The specific profile should be created in the expected sub-package. diff --git a/bunsen/bunsen-core-r4/src/main/java/com/cerner/bunsen/definitions/r4/R4StructureDefinitions.java b/bunsen/bunsen-core-r4/src/main/java/com/cerner/bunsen/definitions/r4/R4StructureDefinitions.java index e8c477b61..dd43e60ae 100644 --- a/bunsen/bunsen-core-r4/src/main/java/com/cerner/bunsen/definitions/r4/R4StructureDefinitions.java +++ b/bunsen/bunsen-core-r4/src/main/java/com/cerner/bunsen/definitions/r4/R4StructureDefinitions.java @@ -8,6 +8,7 @@ import com.cerner.bunsen.definitions.QualifiedPath; import com.cerner.bunsen.definitions.StructureDefinitions; import com.cerner.bunsen.definitions.StructureField; +import com.google.common.base.Verify; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; @@ -18,6 +19,7 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.hl7.fhir.r4.model.CanonicalType; import org.hl7.fhir.r4.model.ElementDefinition; import org.hl7.fhir.r4.model.ElementDefinition.TypeRefComponent; @@ -83,33 +85,24 @@ private StructureDefinition getDefinition(ElementDefinition element) { } private List> singleField(String elementName, T result) { - + if (result == null) { + return Collections.emptyList(); + } return Collections.singletonList(StructureField.property(elementName, result)); } - private boolean shouldTerminateRecursive(DefinitionVisitor visitor, - StructureDefinition structureDefinition, - Deque stack) { - - ElementDefinition definitionRootElement = structureDefinition.getSnapshot().getElement().get(0); - - return shouldTerminateRecursive(visitor, structureDefinition, definitionRootElement, stack); - } - - private boolean shouldTerminateRecursive(DefinitionVisitor visitor, - StructureDefinition rootDefinition, - ElementDefinition elementDefinition, Deque stack) { - - - return shouldTerminateRecursive(visitor, - new QualifiedPath(rootDefinition.getUrl(), elementDefinition.getPath()), - stack); - } - private boolean shouldTerminateRecursive(DefinitionVisitor visitor, QualifiedPath newPath, Deque stack) { + // TODO we should add configuration parameters for exceptional paths + // that require deeper recursion; this is where to apply that logic: + // String elementPath = DefinitionVisitorsUtil.pathFromStack( + // DefinitionVisitorsUtil.elementName(newPath.getElementPath()), stack); + // if ("QuestionnaireResponse.item.item.item.item.item.item".startsWith(elementPath)) { + // return false; + // } + int maxDepth = visitor.getMaxDepth(newPath.getParentTypeUrl(), newPath.getElementPath()); return stack.stream().filter(path -> path.equals(newPath)).count() > maxDepth; @@ -149,26 +142,17 @@ private List> extensionElementToFields(DefinitionVisitor extensionDefinitions = definition.getSnapshot().getElement(); + List extensionDefinitions = definition.getSnapshot().getElement(); - ElementDefinition extensionRoot = extensionDefinitions.get(0); + ElementDefinition extensionRoot = extensionDefinitions.get(0); - extensions = visitExtensionDefinition(visitor, - rootDefinition, - element.getSliceName(), - stack, - definition.getUrl(), - extensionDefinitions, - extensionRoot); - } + extensions = visitExtensionDefinition(visitor, + rootDefinition, + element.getSliceName(), + stack, + definition.getUrl(), + extensionDefinitions, + extensionRoot); } else { @@ -286,11 +270,7 @@ private List> elementToFields(DefinitionVisitor visitor String elementName = DefinitionVisitorsUtil.elementName(element.getPath()); - if (shouldTerminateRecursive(visitor, rootDefinition, element, stack)) { - - return Collections.emptyList(); - - } else if (element.getMax().equals("0")) { + if (element.getMax().equals("0")) { // Fields with max of zero are omitted. return Collections.emptyList(); @@ -338,8 +318,12 @@ private List> elementToFields(DefinitionVisitor visitor (StructureDefinition) validationSupport .fetchStructureDefinition(typeRef.getCode()); + // TODO document why we are resetting the stack here; it is not clear + // why this cannot lead to infinite recursion for choice types. If + // we don't reset the stack, then we should handle null returns. T child = transform(visitor, element, structureDefinition, new ArrayDeque<>()); - + Verify.verify(child != null, + "Unexpected null choice type {} for element {}", typeRef, element); choiceTypes.put(typeRef.getCode(), child); } } @@ -360,22 +344,19 @@ private List> elementToFields(DefinitionVisitor visitor // Handle defined data types. StructureDefinition definition = getDefinition(element); - if (shouldTerminateRecursive(visitor, definition, stack)) { - - return Collections.emptyList(); - - } else { - - T type = transform(visitor, element, definition, stack); + T type = transform(visitor, element, definition, stack); - return singleField(elementName, - visitor.visitMultiValued(elementName, type)); - } + return singleField(elementName, + visitor.visitMultiValued(elementName, type)); } else { List> childElements = transformChildren(visitor, rootDefinition, definitions, stack, element); + if (childElements.isEmpty()) { + // All children were dropped because of recursion depth limit. + return Collections.emptyList(); + } T result = visitor.visitComposite(elementName, DefinitionVisitorsUtil.pathFromStack(elementName, stack), @@ -399,27 +380,26 @@ private List> elementToFields(DefinitionVisitor visitor } else if (getDefinition(element) != null) { + // TODO refactor this and the similar block above for handling defined data types. // Handle defined data types. StructureDefinition definition = getDefinition(element); - if (shouldTerminateRecursive(visitor, definition, stack)) { - - return Collections.emptyList(); - - } else { - T type = transform(visitor, element, definition, stack); + T type = transform(visitor, element, definition, stack); - return singleField(DefinitionVisitorsUtil.elementName(element.getPath()), type); - } + return singleField(DefinitionVisitorsUtil.elementName(element.getPath()), type); } else { // Handle composite type List> childElements = transformChildren(visitor, rootDefinition, definitions, stack, element); + if (childElements.isEmpty()) { + // All children were dropped because of recursion depth limit. + return Collections.emptyList(); + } T result = visitor.visitComposite(elementName, - element.getPath(), + DefinitionVisitorsUtil.pathFromStack(elementName, stack), elementName, rootDefinition.getUrl(), childElements); @@ -428,6 +408,12 @@ private List> elementToFields(DefinitionVisitor visitor } } + /** + * Goes through the list of children of the given `element` and convert each + * of those `ElementDefinision`s to `StructureField`s. + * NOTE: This is the only place where the traversal stack can grow. It is also + * best if this is the only place where `shouldTerminateRecursive` is called. + */ private List> transformChildren(DefinitionVisitor visitor, StructureDefinition rootDefinition, List definitions, @@ -475,15 +461,13 @@ private StructureField transformContained(DefinitionVisitor visitor, List childDefinitions = containedDefinition.getSnapshot().getElement(); - stack.push(new QualifiedPath(containedDefinition.getUrl(), containedRootElement.getPath())); - List> childElements = transformChildren(visitor, containedDefinition, childDefinitions, stack, containedRootElement); - - stack.pop(); + // At this level no child should be dropped because of recursion limit. + Verify.verify(!childElements.isEmpty()); String rootName = DefinitionVisitorsUtil.elementName(containedRootElement.getPath()); @@ -511,26 +495,7 @@ public FhirConversionSupport conversionSupport() { @Override public T transform(DefinitionVisitor visitor, String resourceTypeUrl) { - - StructureDefinition definition = (StructureDefinition) context.getValidationSupport() - .fetchStructureDefinition(resourceTypeUrl); - - if (definition == null) { - - throw new IllegalArgumentException("Unable to find definition for " + resourceTypeUrl); - } - - return transform(visitor, definition); - } - - /** - * Returns the Spark struct type used to encode the given FHIR composite. - * - * @return The schema as a Spark StructType - */ - public T transform(DefinitionVisitor visitor, StructureDefinition definition) { - - return transform(visitor, null, definition, new ArrayDeque<>()); + return transform(visitor, resourceTypeUrl, Collections.emptyList()); } @Override @@ -562,9 +527,10 @@ public T transform(DefinitionVisitor visitor, }) .collect(Collectors.toList()); - return transformRoot(visitor, definition, containedDefinitions, new ArrayDeque<>()); + return transformRoot(visitor, definition, containedDefinitions); } + // TODO make the separation between this and `elementToFields` more clear. /** * Transforms the given FHIR structure definition. * @@ -576,6 +542,7 @@ public T transform(DefinitionVisitor visitor, * * @return the transformed structure, or null if it should not be included in the parent. */ + @Nullable private T transform(DefinitionVisitor visitor, ElementDefinition parentElement, StructureDefinition definition, @@ -585,13 +552,9 @@ private T transform(DefinitionVisitor visitor, ElementDefinition root = definitions.get(0); - stack.push(new QualifiedPath(definition.getUrl(), root.getPath())); - List> childElements = transformChildren(visitor, definition, definitions, stack, root); - stack.pop(); - if ("Reference".equals(definition.getType())) { // TODO: if this is in an option there may be other non-reference types here? @@ -628,8 +591,12 @@ private T transform(DefinitionVisitor visitor, // https://github.com/FHIR/sql-on-fhir/blob/master/sql-on-fhir.md#id-fields-omitted childElements.removeIf(field -> field.fieldName().equals("id")); + if (childElements.isEmpty()) { + // All children were dropped because of recursion depth limit. + return null; + } return visitor.visitComposite(rootName, - rootName, + DefinitionVisitorsUtil.pathFromStack(root.getPath(), stack), rootName, definition.getUrl(), childElements); @@ -638,22 +605,22 @@ private T transform(DefinitionVisitor visitor, private T transformRoot(DefinitionVisitor visitor, StructureDefinition definition, - List containedDefinitions, - Deque stack) { + List containedDefinitions) { ElementDefinition definitionRootElement = definition.getSnapshot().getElementFirstRep(); List definitions = definition.getSnapshot().getElement(); - ElementDefinition root = definitions.get(0); - - stack.push(new QualifiedPath(definition.getUrl(), definitionRootElement.getPath())); + Deque stack = new ArrayDeque<>(); List> childElements = transformChildren(visitor, definition, definitions, stack, - root); + definitionRootElement); + // At this level no child should be dropped because of recursion limit. + Verify.verify(!childElements.isEmpty()); + Verify.verify(stack.isEmpty()); // If there are contained definitions, create a Resource Container StructureField if (containedDefinitions.size() > 0) { @@ -662,16 +629,14 @@ private T transformRoot(DefinitionVisitor visitor, definition, containedDefinitions, stack, - root); + definitionRootElement); // Replace default StructureField with constructed Resource Container StructureField // TODO make this future proof instead of using a hard-coded index for `contained`. childElements.set(5, containedElement); } - stack.pop(); - - String rootName = DefinitionVisitorsUtil.elementName(root.getPath()); + String rootName = DefinitionVisitorsUtil.elementName(definitionRootElement.getPath()); return visitor.visitComposite(rootName, rootName, diff --git a/bunsen/bunsen-core-stu3/src/main/java/com/cerner/bunsen/definitions/stu3/Stu3StructureDefinitions.java b/bunsen/bunsen-core-stu3/src/main/java/com/cerner/bunsen/definitions/stu3/Stu3StructureDefinitions.java index 1a3436e10..e9de08953 100644 --- a/bunsen/bunsen-core-stu3/src/main/java/com/cerner/bunsen/definitions/stu3/Stu3StructureDefinitions.java +++ b/bunsen/bunsen-core-stu3/src/main/java/com/cerner/bunsen/definitions/stu3/Stu3StructureDefinitions.java @@ -8,6 +8,7 @@ import com.cerner.bunsen.definitions.QualifiedPath; import com.cerner.bunsen.definitions.StructureDefinitions; import com.cerner.bunsen.definitions.StructureField; +import com.google.common.base.Verify; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; @@ -17,15 +18,20 @@ import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.hl7.fhir.dstu3.model.ElementDefinition; import org.hl7.fhir.dstu3.model.ElementDefinition.TypeRefComponent; import org.hl7.fhir.dstu3.model.StructureDefinition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@link StructureDefinitions} implementation for FHIR STU3. */ public class Stu3StructureDefinitions extends StructureDefinitions { + private static final Logger log = LoggerFactory.getLogger(Stu3StructureDefinitions.class); + private static final FhirConversionSupport CONVERSION_SUPPORT = new Stu3FhirConversionSupport(); public Stu3StructureDefinitions(FhirContext context) { @@ -73,29 +79,12 @@ private StructureDefinition getDefinition(ElementDefinition element) { } private List> singleField(String elementName, T result) { - + if (result == null) { + return Collections.emptyList(); + } return Collections.singletonList(StructureField.property(elementName, result)); } - private boolean shouldTerminateRecursive(DefinitionVisitor visitor, - StructureDefinition structureDefinition, - Deque stack) { - - ElementDefinition definitionRootElement = structureDefinition.getSnapshot().getElement().get(0); - - return shouldTerminateRecursive(visitor, structureDefinition, definitionRootElement, stack); - } - - private boolean shouldTerminateRecursive(DefinitionVisitor visitor, - StructureDefinition rootDefinition, - ElementDefinition elementDefinition, Deque stack) { - - - return shouldTerminateRecursive(visitor, - new QualifiedPath(rootDefinition.getUrl(), elementDefinition.getPath()), - stack); - } - private boolean shouldTerminateRecursive(DefinitionVisitor visitor, QualifiedPath newPath, Deque stack) { @@ -121,26 +110,17 @@ private List> extensionElementToFields(DefinitionVisitor extensionDefinitions = definition.getSnapshot().getElement(); + List extensionDefinitions = definition.getSnapshot().getElement(); - ElementDefinition extensionRoot = extensionDefinitions.get(0); + ElementDefinition extensionRoot = extensionDefinitions.get(0); - extensions = visitExtensionDefinition(visitor, - rootDefinition, - element.getSliceName(), - stack, - definition.getUrl(), - extensionDefinitions, - extensionRoot); - } + extensions = visitExtensionDefinition(visitor, + rootDefinition, + element.getSliceName(), + stack, + definition.getUrl(), + extensionDefinitions, + extensionRoot); } else { @@ -257,11 +237,7 @@ private List> elementToFields(DefinitionVisitor visitor String elementName = DefinitionVisitorsUtil.elementName(element.getPath()); - if (shouldTerminateRecursive(visitor, rootDefinition, element, stack)) { - - return Collections.emptyList(); - - } else if (element.getMax().equals("0")) { + if (element.getMax().equals("0")) { // Fields with max of zero are omitted. return Collections.emptyList(); @@ -303,8 +279,12 @@ private List> elementToFields(DefinitionVisitor visitor (StructureDefinition) validationSupport .fetchStructureDefinition(typeRef.getCode()); + // TODO document why we are resetting the stack here; it is not clear + // why this cannot lead to infinite recursion for choice types. If + // we don't reset the stack, then we should handle null returns. T child = transform(visitor, element, structureDefinition, new ArrayDeque<>()); - + Verify.verify(child != null, + "Unexpected null choice type {} for element {}", typeRef, element); choiceTypes.put(typeRef.getCode(), child); } } @@ -325,25 +305,22 @@ private List> elementToFields(DefinitionVisitor visitor // Handle defined data types. StructureDefinition definition = getDefinition(element); - if (shouldTerminateRecursive(visitor, definition, stack)) { - - return Collections.emptyList(); - - } else { - - T type = transform(visitor, element, definition, stack); + T type = transform(visitor, element, definition, stack); - return singleField(elementName, - visitor.visitMultiValued(elementName, type)); - } + return singleField(elementName, + visitor.visitMultiValued(elementName, type)); } else { List> childElements = transformChildren(visitor, rootDefinition, definitions, stack, element); + if (childElements.isEmpty()) { + // All children were dropped because of recursion depth limit. + return Collections.emptyList(); + } T result = visitor.visitComposite(elementName, - element.getPath(), + DefinitionVisitorsUtil.pathFromStack(elementName, stack), elementName, rootDefinition.getUrl(), childElements); @@ -364,24 +341,23 @@ private List> elementToFields(DefinitionVisitor visitor } else if (getDefinition(element) != null) { + // TODO refactor this and the similar block above for handling defined data types. // Handle defined data types. StructureDefinition definition = getDefinition(element); - if (shouldTerminateRecursive(visitor, definition, stack)) { - - return Collections.emptyList(); - - } else { - T type = transform(visitor, element, definition, stack); + T type = transform(visitor, element, definition, stack); - return singleField(DefinitionVisitorsUtil.elementName(element.getPath()), type); - } + return singleField(DefinitionVisitorsUtil.elementName(element.getPath()), type); } else { // Handle composite type List> childElements = transformChildren(visitor, rootDefinition, definitions, stack, element); + if (childElements.isEmpty()) { + // All children were dropped because of recursion depth limit. + return Collections.emptyList(); + } T result = visitor.visitComposite(elementName, DefinitionVisitorsUtil.pathFromStack(elementName, stack), @@ -393,6 +369,12 @@ private List> elementToFields(DefinitionVisitor visitor } } + /** + * Goes through the list of children of the given `element` and convert each + * of those `ElementDefinision`s to `StructureField`s. + * NOTE: This is the only place where the traversal stack can grow. It is also + * best if this is the only place where `shouldTerminateRecursive` is called. + */ private List> transformChildren(DefinitionVisitor visitor, StructureDefinition rootDefinition, List definitions, @@ -440,15 +422,13 @@ private StructureField transformContained(DefinitionVisitor visitor, List childDefinitions = containedDefinition.getSnapshot().getElement(); - stack.push(new QualifiedPath(containedDefinition.getUrl(), containedRootElement.getPath())); - List> childElements = transformChildren(visitor, containedDefinition, childDefinitions, stack, containedRootElement); - - stack.pop(); + // At this level no child should be dropped because of recursion limit. + Verify.verify(!childElements.isEmpty()); String rootName = DefinitionVisitorsUtil.elementName(containedRootElement.getPath()); @@ -476,26 +456,7 @@ public FhirConversionSupport conversionSupport() { @Override public T transform(DefinitionVisitor visitor, String resourceTypeUrl) { - - StructureDefinition definition = (StructureDefinition) context.getValidationSupport() - .fetchStructureDefinition(resourceTypeUrl); - - if (definition == null) { - - throw new IllegalArgumentException("Unable to find definition for " + resourceTypeUrl); - } - - return transform(visitor, definition); - } - - /** - * Returns the Spark struct type used to encode the given FHIR composite. - * - * @return The schema as a Spark StructType - */ - public T transform(DefinitionVisitor visitor, StructureDefinition definition) { - - return transform(visitor, null, definition, new ArrayDeque<>()); + return transform(visitor, resourceTypeUrl, Collections.emptyList()); } @Override @@ -527,7 +488,7 @@ public T transform(DefinitionVisitor visitor, }) .collect(Collectors.toList()); - return transformRoot(visitor, definition, containedDefinitions, new ArrayDeque<>()); + return transformRoot(visitor, definition, containedDefinitions); } /** @@ -541,6 +502,7 @@ public T transform(DefinitionVisitor visitor, * * @return the transformed structure, or null if it should not be included in the parent. */ + @Nullable private T transform(DefinitionVisitor visitor, ElementDefinition parentElement, StructureDefinition definition, @@ -550,13 +512,9 @@ private T transform(DefinitionVisitor visitor, ElementDefinition root = definitions.get(0); - stack.push(new QualifiedPath(definition.getUrl(), root.getPath())); - List> childElements = transformChildren(visitor, definition, definitions, stack, root); - stack.pop(); - if ("Reference".equals(definition.getType())) { // TODO: if this is in an option there may be other non-reference types here? @@ -588,8 +546,12 @@ private T transform(DefinitionVisitor visitor, // https://github.com/FHIR/sql-on-fhir/blob/master/sql-on-fhir.md#id-fields-omitted childElements.removeIf(field -> field.fieldName().equals("id")); + if (childElements.isEmpty()) { + // All children were dropped because of recursion depth limit. + return null; + } return visitor.visitComposite(rootName, - rootName, + DefinitionVisitorsUtil.pathFromStack(root.getPath(), stack), rootName, definition.getUrl(), childElements); @@ -598,22 +560,22 @@ private T transform(DefinitionVisitor visitor, private T transformRoot(DefinitionVisitor visitor, StructureDefinition definition, - List containedDefinitions, - Deque stack) { + List containedDefinitions) { ElementDefinition definitionRootElement = definition.getSnapshot().getElementFirstRep(); List definitions = definition.getSnapshot().getElement(); - ElementDefinition root = definitions.get(0); - - stack.push(new QualifiedPath(definition.getUrl(), definitionRootElement.getPath())); + Deque stack = new ArrayDeque<>(); List> childElements = transformChildren(visitor, definition, definitions, stack, - root); + definitionRootElement); + // At this level no child should be dropped because of recursion limit. + Verify.verify(!childElements.isEmpty()); + Verify.verify(stack.isEmpty()); // If there are contained definitions, create a Resource Container StructureField if (containedDefinitions.size() > 0) { @@ -622,15 +584,13 @@ private T transformRoot(DefinitionVisitor visitor, definition, containedDefinitions, stack, - root); + definitionRootElement); // Replace default StructureField with constructed Resource Container StructureField childElements.set(5, containedElement); } - stack.pop(); - - String rootName = DefinitionVisitorsUtil.elementName(root.getPath()); + String rootName = DefinitionVisitorsUtil.elementName(definitionRootElement.getPath()); return visitor.visitComposite(rootName, rootName, diff --git a/bunsen/bunsen-core/src/main/java/com/cerner/bunsen/definitions/HapiCompositeConverter.java b/bunsen/bunsen-core/src/main/java/com/cerner/bunsen/definitions/HapiCompositeConverter.java index d40ddf11b..0295f2180 100644 --- a/bunsen/bunsen-core/src/main/java/com/cerner/bunsen/definitions/HapiCompositeConverter.java +++ b/bunsen/bunsen-core/src/main/java/com/cerner/bunsen/definitions/HapiCompositeConverter.java @@ -4,6 +4,7 @@ import ca.uhn.fhir.context.BaseRuntimeElementCompositeDefinition; import ca.uhn.fhir.context.BaseRuntimeElementDefinition; import ca.uhn.fhir.context.RuntimeElemContainedResourceList; +import com.google.common.base.Preconditions; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -144,7 +145,8 @@ protected HapiCompositeConverter(String elementType, T structType, FhirConversionSupport fhirSupport, String extensionUrl) { - + // A composite type should have at least one child. + Preconditions.checkArgument(!children.isEmpty()); this.elementType = elementType; this.children = children; this.structType = structType; diff --git a/pipelines/batch/src/main/java/org/openmrs/analytics/ConvertResourceFn.java b/pipelines/batch/src/main/java/org/openmrs/analytics/ConvertResourceFn.java index b7f160dbe..5effbce76 100644 --- a/pipelines/batch/src/main/java/org/openmrs/analytics/ConvertResourceFn.java +++ b/pipelines/batch/src/main/java/org/openmrs/analytics/ConvertResourceFn.java @@ -55,7 +55,9 @@ public class ConvertResourceFn extends FetchSearchPageFn { private final Boolean processDeletedRecords; - Counter counter = Metrics.counter(MetricsConstants.METRICS_NAMESPACE, MetricsConstants.DATA_FORMAT_EXCEPTION_ERROR); + Counter counter = + Metrics.counter( + MetricsConstants.METRICS_NAMESPACE, MetricsConstants.DATA_FORMAT_EXCEPTION_ERROR); ConvertResourceFn(FhirEtlOptions options, String stageIdentifier) { super(options, stageIdentifier); @@ -121,8 +123,7 @@ public void writeResource(HapiRowDescriptor element) } else { try { resource = (Resource) parser.parseResource(jsonResource); - } - catch (DataFormatException e) { + } catch (DataFormatException e) { log.error("DataFormatException Error occurred", e); counter.inc(); return; diff --git a/pipelines/batch/src/test/java/org/openmrs/analytics/JdbcFetchHapiTest.java b/pipelines/batch/src/test/java/org/openmrs/analytics/JdbcFetchHapiTest.java index 898561b91..3c4dd2257 100644 --- a/pipelines/batch/src/test/java/org/openmrs/analytics/JdbcFetchHapiTest.java +++ b/pipelines/batch/src/test/java/org/openmrs/analytics/JdbcFetchHapiTest.java @@ -111,8 +111,8 @@ public void testSearchResourceCounts() throws SQLException { Mockito.when(mockedDataSource.getConnection()).thenReturn(mockedConnection); Mockito.when( mockedConnection.prepareStatement( - "SELECT count(*) as count FROM hfj_resource res where res.res_type = ? AND res.res_updated >" - + " '" + "SELECT count(*) as count FROM hfj_resource res where res.res_type = ? AND" + + " res.res_updated > '" + since + "'")) .thenReturn(mockedPreparedStatement);