Skip to content

Commit

Permalink
fixed some logic for subgraphs allowing explicit versioning, improve…
Browse files Browse the repository at this point in the history
…d error messages, general clean up
  • Loading branch information
EvanDietzMorris committed Aug 23, 2024
1 parent e26eb17 commit e9f49e4
Showing 1 changed file with 62 additions and 47 deletions.
109 changes: 62 additions & 47 deletions Common/build_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
REDUNDANT_EDGES_FILENAME = 'redundant_edges.jsonl'


class GraphSpecError(Exception):
def __init__(self, error_message: str, actual_error: Exception = None):
self.error_message = error_message
self.actual_error = actual_error


class GraphBuilder:

def __init__(self):
Expand All @@ -33,9 +39,9 @@ def __init__(self):
line_format='medium',
log_file_path=os.environ['ORION_LOGS'])

#This dictionary determines the graph versions of already parsed graphs.
#This is more tempermental than it seems because the only way to get
#the version name for many subgraphs is to download it.
# This dictionary determines the graph versions of already parsed graphs.
# This is more temperamental than it seems because the only way to get
# the version name for many subgraphs is to download it.
self.graph_id_to_version = {}

self.graphs_dir = self.init_graphs_dir() # path to the graphs output directory
Expand Down Expand Up @@ -138,32 +144,49 @@ def build_graph(self, graph_id: str):
redundant_filepath = edges_filepath.replace(EDGES_FILENAME, REDUNDANT_EDGES_FILENAME)
generate_redundant_kg(edges_filepath, redundant_filepath)

def get_graph_version(self, graph_id:str) -> str:
if(graph_id not in self.graph_id_to_version):
def get_graph_version(self, graph_id: str) -> str:
if graph_id not in self.graph_id_to_version:
graph_spec = self.get_graph_spec(graph_id)
if not graph_spec:
raise GraphSpecError(error_message=f'Tried to dynamically determine the version of a '
f'graph that was not found in the Graph Spec.')
graph_version = self.generate_graph_version(graph_spec)
self.graph_id_to_version[graph_id] = graph_version
return self.graph_id_to_version[graph_id]


def build_dependencies(self, graph_spec: GraphSpec):
for subgraph_source in graph_spec.subgraphs:
subgraph_id = subgraph_source.id
subgraph_version = self.get_graph_version(subgraph_id)
subgraph_version = subgraph_source.version
# Get the subgraph version from the subgraph source spec,
# which will either be one specified in the graph spec or None.
if not subgraph_version:
try:
# if one was not specified, retrieve or generate it like we would any graph version
subgraph_version = self.get_graph_version(subgraph_id)
except GraphSpecError:
self.logger.error(f'Could not determine version of subgraph {subgraph_id}. '
f'Either specify an existing version of the graph, or the subgraph must '
f'be defined in the same Graph Spec.')
return False
if self.check_for_existing_graph_dir(subgraph_id, subgraph_version):
# load previous metadata
# load previous metadata if the specified subgraph version was already built
graph_metadata = self.get_graph_metadata(subgraph_id, subgraph_version)
subgraph_source.graph_metadata = graph_metadata.metadata
elif self.graphid_to_version_name[subgraph_id] == subgraph_version:
self.logger.warning(f'For graph {graph_spec.graph_id} subgraph dependency '
f'{subgraph_id} version {subgraph_version} is not ready. Building now...')
self.build_graph(subgraph_id)
#With changes here, this line should never run...
else:
self.logger.warning(f'Building graph {graph_spec.graph_id} failed, '
f'subgraph {subgraph_id} had version {subgraph_version} specified, '
f'but that version of the graph was not found in the graphs directory.')
return False
# If the subgraph doesn't already exist, we need to make sure it matches the current version of the
# subgraph as generated by the current graph spec, otherwise we won't be able to build it.
current_subgraph_version = self.get_graph_version(subgraph_id)
if subgraph_version == current_subgraph_version:
self.logger.warning(f'For graph {graph_spec.graph_id} subgraph dependency '
f'{subgraph_id} is not ready. Building now...')
self.build_graph(subgraph_id)
else:
self.logger.error(f'Subgraph ({subgraph_id}) version ({subgraph_version}) was specified, but that '
f'version of the graph could not be found. It can not be built now because the '
f'current version is {current_subgraph_version}. Either specify a version that '
f'is already built, or leave the subgraph version blank to automatically '
f'build the new one.')

graph_metadata = self.get_graph_metadata(subgraph_id, subgraph_version)
if graph_metadata.get_build_status() == Metadata.STABLE:
Expand Down Expand Up @@ -370,7 +393,7 @@ def parse_graph_spec(self, graph_spec_yaml):
graph_wide_normalization_code_version = graph_yaml['normalization_code_version'] \
if 'normalization_code_version' in graph_yaml else None

# apply them to all of the data sources, this will overwrite anything defined at the source level
# apply them to all the data sources, this will overwrite anything defined at the source level
for data_source in data_sources:
if graph_wide_node_norm_version is not None:
data_source.normalization_scheme.node_normalization_version = graph_wide_node_norm_version
Expand All @@ -385,13 +408,13 @@ def parse_graph_spec(self, graph_spec_yaml):

graph_output_format = graph_yaml['output_format'] if 'output_format' in graph_yaml else ""
graph_spec = GraphSpec(graph_id=graph_id,
graph_name=graph_name,
graph_description=graph_description,
graph_url=graph_url,
graph_version=None, # this will get populated later
graph_output_format=graph_output_format,
subgraphs=subgraph_sources,
sources=data_sources)
graph_name=graph_name,
graph_description=graph_description,
graph_url=graph_url,
graph_version=None, # this will get populated later
graph_output_format=graph_output_format,
subgraphs=subgraph_sources,
sources=data_sources)
graph_specs.append(graph_spec)
except Exception as e:
self.logger.error(f'Error parsing Graph Spec ({graph_id}), formatting error or missing information: {repr(e)}')
Expand All @@ -400,17 +423,7 @@ def parse_graph_spec(self, graph_spec_yaml):

def parse_subgraph_spec(self, subgraph_yml):
subgraph_id = subgraph_yml['graph_id']

if 'graph_version' in subgraph_yml: subgraph_version = subgraph_yml['graph_version']
else: subgraph_version= 'latest'

# if subgraph_version == 'current':
# if subgraph_id in self.current_graph_versions:
# subgraph_version = self.current_graph_versions[subgraph_id]
# else:
# raise Exception(f'Graph Spec Error - Could not determine version of subgraph {subgraph_id}. '
# f'Either specify an existing version, already built in your graphs directory, '
# f'or the subgraph must be defined previously in the same Graph Spec.')
subgraph_version = subgraph_yml['graph_version'] if 'graph_version' in subgraph_yml else None
merge_strategy = subgraph_yml['merge_strategy'] if 'merge_strategy' in subgraph_yml else 'default'
subgraph_source = SubGraphSource(id=subgraph_id,
version=subgraph_version,
Expand All @@ -425,10 +438,15 @@ def parse_data_source_spec(self, source_yml):
f'Valid sources are: {", ".join(get_available_data_sources())}')
raise Exception(error_message)

if 'source_version' not in source_yml or source_yml['source_version']=='latest':
# The DataSource() will get initialized with either a specific source version, if specified,
# or a callable function which can determine the latest source version. This is for a lazy initialization
# technique, so that we don't call get_latest_source_version until we need to, if at all.
if 'source_version' not in source_yml or source_yml['source_version'] == 'latest':
get_source_version = self.source_data_manager.get_latest_source_version
source_version = None
else:
get_source_version = lambda source_id=None : str(source_yml['source_version'])
source_version = str(source_yml['source_version'])
get_source_version = None

parsing_version = source_yml['parsing_version'] if 'parsing_version' in source_yml \
else self.source_data_manager.get_latest_parsing_version(source_id)
Expand All @@ -452,13 +470,12 @@ def parse_data_source_spec(self, source_yml):
conflation=conflation)
supplementation_version = SequenceVariantSupplementation.SUPPLEMENTATION_VERSION
data_source = DataSource(id=source_id,
version=None, # this will get populated later in build_dependencies
get_source_version=get_source_version,
# source_version=source_version,
merge_strategy=merge_strategy,
normalization_scheme=normalization_scheme,
parsing_version=parsing_version,
supplementation_version=supplementation_version)
source_version=source_version,
get_source_version=get_source_version,
merge_strategy=merge_strategy,
normalization_scheme=normalization_scheme,
parsing_version=parsing_version,
supplementation_version=supplementation_version)
return data_source

def get_graph_spec(self, graph_id: str):
Expand Down Expand Up @@ -491,8 +508,6 @@ def check_for_existing_graph_dir(self, graph_id: str, graph_version: str):
def get_graph_metadata(self, graph_id: str, graph_version: str):
# make sure the output directory exists (where we check for existing GraphMetadata)
graph_output_dir = self.get_graph_dir_path(graph_id, graph_version)
if not os.path.isdir(graph_output_dir):
os.makedirs(graph_output_dir)

# load existing or create new metadata file
return GraphMetadata(graph_id, graph_output_dir)
Expand Down

0 comments on commit e9f49e4

Please sign in to comment.