Skip to content

Commit

Permalink
agent: update inferred schemas on publish
Browse files Browse the repository at this point in the history
Updates the agent to always update collection inferred schemas on any
non-touch collection publications. This makes it impossible for a user
to accidentally clobber an embedded inferred schema definition.

Also removes inferred schema handling from the `validation` crate, since
the drafted specs will already contain the updated schemas. Now all
validation needs to do is to bundle the write schema if necessary.
  • Loading branch information
psFried committed Oct 4, 2024
1 parent 621dbef commit 4e6f5ea
Show file tree
Hide file tree
Showing 18 changed files with 204 additions and 463 deletions.
30 changes: 9 additions & 21 deletions crates/agent/src/controllers/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ pub struct InferredSchemaStatus {
#[serde(default, skip_serializing_if = "Option::is_none")]
#[schemars(schema_with = "super::datetime_schema")]
pub schema_last_updated: Option<DateTime<Utc>>,
/// The md5 sum of the inferred schema that was last published
/// The md5 sum of the inferred schema that was last published.
/// Because the publications handler updates the model instead of the controller, it's
/// technically possible for the published inferred schema to be more recent than the one
/// corresponding to this hash. If that happens, we would expect a subsequent publication
/// on the next controller run, which would update the hash but not actually modify the schema.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub schema_md5: Option<String>,
}
Expand Down Expand Up @@ -191,7 +195,7 @@ impl InferredSchemaStatus {
if let Some(inferred_schema) = maybe_inferred_schema {
let tables::InferredSchema {
collection_name,
schema,
schema: _, // we let the publications handler set the inferred schema
md5,
} = inferred_schema;

Expand All @@ -216,7 +220,9 @@ impl InferredSchemaStatus {
is_touch: false, // We intend to update the model
}
});
update_inferred_schema(draft_row, &schema)?;
// The inferred schema is always updated as part of any non-touch publication,
// so we don't need to actually update the model here.
draft_row.is_touch = false;

let pub_result = pending_pub
.finish(state, publication_status, control_plane)
Expand Down Expand Up @@ -251,24 +257,6 @@ fn read_schema_bundles_write_schema(model: &models::CollectionDef) -> bool {
>= 3
}

fn update_inferred_schema(
collection: &mut tables::DraftCollection,
inferred_schema: &models::Schema,
) -> anyhow::Result<()> {
let Some(model) = collection.model.as_mut() else {
anyhow::bail!("missing model to update inferred schema");
};
let new_read_schema = {
let Some(read_schema) = model.read_schema.as_ref() else {
anyhow::bail!("model is missing read schema");
};
models::Schema::extend_read_bundle(read_schema, None, Some(inferred_schema))
};

model.read_schema = Some(new_read_schema);
Ok(())
}

pub fn uses_inferred_schema(collection: &models::CollectionDef) -> bool {
collection
.read_schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ expression: schema
"type": "string"
},
"schema_md5": {
"description": "The md5 sum of the inferred schema that was last published",
"description": "The md5 sum of the inferred schema that was last published. Because the publications handler updates the model instead of the controller, it's technically possible for the published inferred schema to be more recent than the one corresponding to this hash. If that happens, we would expect a subsequent publication on the next controller run, which would update the hash but not actually modify the schema.",
"type": [
"string",
"null"
Expand Down
4 changes: 2 additions & 2 deletions crates/agent/src/controlplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{collections::BTreeSet, ops::Deref};

use crate::publications::{
DefaultRetryPolicy, DraftPublication, JobStatus, NoExpansion, NoopFinalize, PublicationResult,
Publisher,
Publisher, UpdateInferredSchemas,
};

macro_rules! unwrap_single {
Expand Down Expand Up @@ -363,7 +363,7 @@ impl ControlPlane for PGControlPlane {
default_data_plane_name: None,
// skip authz checks for controller-initiated publications
verify_user_authz: false,
initialize: NoExpansion,
initialize: UpdateInferredSchemas,
finalize: NoopFinalize,
retry: DefaultRetryPolicy,
};
Expand Down
8 changes: 2 additions & 6 deletions crates/agent/src/integration_tests/harness.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::sync::{Arc, Mutex};

use crate::publications::{DefaultRetryPolicy, NoExpansion};
use crate::publications::{DefaultRetryPolicy, UpdateInferredSchemas};
use crate::{
controllers::{ControllerHandler, ControllerState},
controlplane::ConnectorSpec,
Expand Down Expand Up @@ -1057,10 +1057,6 @@ impl ControlPlane for TestControlPlane {
self.inner.current_time()
}

/// Tests use a custom publish loop, so that failures can be injected into
/// the build. This is admittedly a little gross, but at least it's pretty
/// simple. And I'm hopeful that a better factoring of the `Publisher` will
/// one day allow this to be replaced with something less bespoke.
async fn publish(
&mut self,
detail: Option<String>,
Expand All @@ -1076,7 +1072,7 @@ impl ControlPlane for TestControlPlane {
dry_run: false,
default_data_plane_name: Some("ops/dp/public/test".to_string()),
verify_user_authz: false,
initialize: NoExpansion,
initialize: UpdateInferredSchemas,
finalize,
retry: DefaultRetryPolicy,
};
Expand Down
4 changes: 1 addition & 3 deletions crates/agent/src/integration_tests/null_bytes.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use super::harness::{draft_catalog, md5_hash, TestHarness};
use crate::{controllers::ControllerState, publications::JobStatus, ControlPlane};
use agent_sql::Capability;
use models::{CatalogType, Id};
use crate::publications::JobStatus;
use tables::InferredSchema;

#[tokio::test]
Expand Down
15 changes: 14 additions & 1 deletion crates/agent/src/integration_tests/schema_evolution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,21 @@ async fn test_schema_evolution() {
assert!(initial_result.status.is_success());
harness.run_pending_controllers(None).await;

// Assert that the pasture collection has had the inferred schema placeholder added
// Assert that the pasture collection has had the inferred schema placeholder added in both
// the model and the built spec.
let pasture_state = harness.get_controller_state("goats/pasture").await;
let pasture_model = pasture_state
.live_spec
.as_ref()
.unwrap()
.as_collection()
.unwrap();
assert!(pasture_model
.read_schema
.as_ref()
.unwrap()
.get()
.contains("inferredSchemaIsNotAvailable"));
let pasture_spec = unwrap_built_collection(&pasture_state);
assert!(pasture_spec
.read_schema_json
Expand Down
8 changes: 1 addition & 7 deletions crates/agent/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod quotas;
pub mod specs;

pub use self::finalize::{FinalizeBuild, NoopFinalize, PruneUnboundCollections};
pub use self::initialize::{ExpandDraft, Initialize, NoExpansion};
pub use self::initialize::{ExpandDraft, Initialize, NoExpansion, UpdateInferredSchemas};
pub use self::retry::{DefaultRetryPolicy, DoNotRetry, RetryPolicy};
pub use self::status::{
get_incompatible_collections, AffectedConsumer, IncompatibleCollection, JobStatus, LockFailure,
Expand Down Expand Up @@ -429,15 +429,9 @@ impl Publisher {
});
}

let inferred_schemas = live_catalog
.inferred_schemas
.iter()
.map(|s| s.collection_name.as_str())
.collect::<Vec<_>>();
let live_spec_names = live_catalog.all_spec_names().collect::<Vec<_>>();
let draft_spec_names = draft.all_spec_names().collect::<Vec<_>>();
tracing::debug!(
?inferred_schemas,
?live_spec_names,
?draft_spec_names,
"resolved publication specs"
Expand Down
14 changes: 9 additions & 5 deletions crates/agent/src/publications/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use tracing::info;
use crate::{
draft,
publications::{
specs, DefaultRetryPolicy, DraftPublication, ExpandDraft, IncompatibleCollection,
JobStatus, PruneUnboundCollections, PublicationResult, Publisher,
initialize::UpdateInferredSchemas, specs, DefaultRetryPolicy, DraftPublication,
ExpandDraft, IncompatibleCollection, JobStatus, PruneUnboundCollections, PublicationResult,
Publisher,
},
HandleResult, Handler,
};
Expand Down Expand Up @@ -132,9 +133,12 @@ impl Publisher {
draft,
verify_user_authz: true,
default_data_plane_name: Some(row.data_plane_name.clone()).filter(|s| !s.is_empty()),
initialize: ExpandDraft {
filter_user_has_admin: true,
},
initialize: (
UpdateInferredSchemas,
ExpandDraft {
filter_user_has_admin: true,
},
),
finalize: PruneUnboundCollections,
retry: DefaultRetryPolicy,
};
Expand Down
78 changes: 76 additions & 2 deletions crates/agent/src/publications/initialize.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use itertools::Itertools;
use models::Capability;
use std::future::Future;
use std::{collections::BTreeMap, future::Future};
use uuid::Uuid;

/// Initialize a draft prior to build/validation. This may add additional specs to the draft.
pub trait Initialize {
pub trait Initialize: Send + Sync {
fn initialize(
&self,
db: &sqlx::PgPool,
Expand All @@ -25,6 +26,79 @@ impl Initialize for NoExpansion {
}
}

pub struct UpdateInferredSchemas;
impl Initialize for UpdateInferredSchemas {
async fn initialize(
&self,
db: &sqlx::PgPool,
_user_id: Uuid,
draft: &mut tables::DraftCatalog,
) -> anyhow::Result<()> {
let collection_names = draft
.collections
.iter()
.filter(|r| uses_inferred_schema(*r))
.map(|c| c.collection.as_str())
.collect::<Vec<_>>();
let rows = agent_sql::live_specs::fetch_inferred_schemas(&collection_names, db).await?;
tracing::debug!(
inferred_schemas = %rows.iter().map(|r| r.collection_name.as_str()).format(", "),
"fetched inferred schemas"
);
let mut by_name = rows
.into_iter()
.map(|r| (r.collection_name, r.schema.0))
.collect::<BTreeMap<_, _>>();

for drafted in draft
.collections
.iter_mut()
.filter(|r| uses_inferred_schema(*r))
{
let maybe_inferred = by_name
.remove(drafted.collection.as_str())
.map(|json| models::Schema::new(json.into()));

let draft_model = drafted.model.as_mut().unwrap();
let draft_read_schema = draft_model.read_schema.take().unwrap();

let new_schema = models::Schema::extend_read_bundle(
&draft_read_schema,
None,
maybe_inferred.as_ref(),
);
draft_model.read_schema = Some(new_schema);
}
Ok(())
}
}

fn uses_inferred_schema(c: &tables::DraftCollection) -> bool {
!c.is_touch
&& c.model.as_ref().is_some_and(|s| {
s.read_schema
.as_ref()
.is_some_and(models::Schema::references_inferred_schema)
})
}

impl<I1, I2> Initialize for (I1, I2)
where
I1: Initialize,
I2: Initialize,
{
async fn initialize(
&self,
db: &sqlx::PgPool,
user_id: Uuid,
draft: &mut tables::DraftCatalog,
) -> anyhow::Result<()> {
self.0.initialize(db, user_id, draft).await?;
self.1.initialize(db, user_id, draft).await?;
Ok(())
}
}

/// An `Initialize` that expands the draft to touch live specs that read from or write to
/// any drafted collections. This may optionally filter the specs based on whether the user
/// has `admin` capability to them.
Expand Down
79 changes: 55 additions & 24 deletions crates/models/src/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,38 +102,21 @@ impl Schema {
) -> Self {
use serde_json::{value::to_raw_value, Value};

let mut read_schema: Skim = serde_json::from_str(read_bundle.get()).unwrap();
let mut read_defs: Skim = read_schema
.get(KEYWORD_DEF)
.map(|d| serde_json::from_str(d.get()).unwrap())
.unwrap_or_default();

// Add a definition for the write schema if it's referenced.
// We cannot add it in all cases because the existing `read_bundle` and
// `write_bundle` may have a common sub-schema defined, and naively adding
// it would result in an indexing error due to the duplicate definition.
// So, we treat $ref: flow://write-schema as a user assertion that there is
// no such conflicting definition (and we may produce an indexing error
// later if they're wrong).
if let Some(write_schema_json) =
write_bundle.filter(|_| read_bundle.references_write_schema())
{
let mut write_schema: Skim = serde_json::from_str(write_schema_json.get()).unwrap();

// Set $id to "flow://write-schema".
_ = write_schema.insert(
KEYWORD_ID.to_string(),
RawValue::from_value(&Value::String(Self::REF_WRITE_SCHEMA_URL.to_string())),
);
// Add as a definition within the read schema.
read_defs.insert(
Self::REF_WRITE_SCHEMA_URL.to_string(),
to_raw_value(&write_schema).unwrap().into(),
);
}
let prepared_write_schema = write_bundle
.filter(|_| read_bundle.references_write_schema())
.map(|write_schema_json| {
Schema::add_id(Schema::REF_WRITE_SCHEMA_URL, write_schema_json)
});

// Add a definition for the inferred schema if it's referenced.
if read_bundle.references_inferred_schema() {
let prepared_inferred_schema = if read_bundle.references_inferred_schema() {
// Prefer the actual inferred schema, or fall back to a sentinel schema
// which allows for validations but fails on the first document.
let inferred_bundle = inferred_bundle.map(|s| s.get()).unwrap_or(
Expand Down Expand Up @@ -166,10 +149,58 @@ impl Schema {
KEYWORD_ID.to_string(),
Value::String(Self::REF_INFERRED_SCHEMA_URL.to_string()),
);
Some(to_raw_value(&inferred_schema).unwrap().into())
} else {
None
};

Schema::add_defs(read_bundle, prepared_write_schema, prepared_inferred_schema)
}

pub fn bundle_write_schema_def(read_schema: &Schema, write_schema: &Schema) -> Schema {
let prepared_write_schema = Schema::add_id(Schema::REF_WRITE_SCHEMA_URL, write_schema);
Schema::add_defs(read_schema, Some(prepared_write_schema), None)
}

fn add_id(id: &str, schema: &Schema) -> RawValue {
let mut skim: Skim = serde_json::from_str(schema.get()).unwrap();

_ = skim.insert(
KEYWORD_ID.to_string(),
RawValue::from_value(&serde_json::Value::String(id.to_string())),
);
serde_json::value::to_raw_value(&skim).unwrap().into()
}

fn add_defs(
target: &Schema,
write_schema: Option<RawValue>,
inferred_schema: Option<RawValue>,
) -> Schema {
use serde_json::value::to_raw_value;

let mut read_schema: Skim = serde_json::from_str(target.get()).unwrap();
let mut read_defs: Skim = read_schema
.get(KEYWORD_DEF)
.map(|d| serde_json::from_str(d.get()).unwrap())
.unwrap_or_default();

// Add a definition for the write schema if it's referenced.
// We cannot add it in all cases because the existing `read_bundle` and
// `write_bundle` may have a common sub-schema defined, and naively adding
// it would result in an indexing error due to the duplicate definition.
// So, we treat $ref: flow://write-schema as a user assertion that there is
// no such conflicting definition (and we may produce an indexing error
// later if they're wrong).
if let Some(write_schema_json) = write_schema {
// Add as a definition within the read schema.
read_defs.insert(Self::REF_WRITE_SCHEMA_URL.to_string(), write_schema_json);
}

if let Some(inferred_schema_json) = inferred_schema {
read_defs.insert(
Self::REF_INFERRED_SCHEMA_URL.to_string(),
to_raw_value(&inferred_schema).unwrap().into(),
inferred_schema_json,
);
}

Expand Down
Loading

0 comments on commit 4e6f5ea

Please sign in to comment.