Skip to content

Commit

Permalink
agent: retry on Build/PublicationSuperseded errors
Browse files Browse the repository at this point in the history
Updates the default publication retry policy to retry `BuildSuperseded`
and `PublicationSuperseded` errors. This addresses a portion of #1520
  • Loading branch information
psFried committed Sep 27, 2024
1 parent 3b2b57a commit c59397b
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 23 deletions.
31 changes: 10 additions & 21 deletions crates/agent/src/integration_tests/dependencies_and_activations.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::collections::BTreeSet;

use super::harness::{self, draft_catalog, TestHarness};
use crate::{
controllers::ControllerState, integration_tests::harness::mock_inferred_schema, ControlPlane,
controllers::ControllerState,
integration_tests::harness::{
draft_catalog, mock_inferred_schema, InjectBuildError, TestHarness,
},
ControlPlane,
};
use models::CatalogType;
use uuid::Uuid;
Expand Down Expand Up @@ -460,12 +463,12 @@ async fn test_dependencies_and_controllers() {

harness.control_plane().fail_next_build(
"owls/materialize",
BuildFailure {
catalog_name: "owls/materialize",
catalog_type: CatalogType::Materialization,
},
InjectBuildError::new(
tables::synthetic_scope("materialization", "owls/materialize"),
anyhow::anyhow!("simulated build failure"),
),
);

harness.control_plane().reset_activations();
let runs = harness.run_pending_controllers(None).await;
assert_controllers_ran(&["owls/capture", "owls/materialize"], runs);

Expand Down Expand Up @@ -548,20 +551,6 @@ async fn test_dependencies_and_controllers() {
.await;
}

#[derive(Debug)]
struct BuildFailure {
catalog_name: &'static str,
catalog_type: CatalogType,
}
impl harness::FailBuild for BuildFailure {
fn modify(&mut self, result: &mut crate::publications::UncommittedBuild) {
result.output.built.errors.insert(tables::Error {
scope: tables::synthetic_scope(self.catalog_type, self.catalog_name),
error: anyhow::anyhow!("simulated build failure"),
});
}
}

fn assert_controllers_ran(expected: &[&str], actual: Vec<ControllerState>) {
let actual_names = actual
.iter()
Expand Down
16 changes: 16 additions & 0 deletions crates/agent/src/integration_tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,22 @@ pub trait FailBuild: std::fmt::Debug + Send + 'static {
fn modify(&mut self, result: &mut UncommittedBuild);
}

#[derive(Debug)]
pub struct InjectBuildError(Option<tables::Error>);
impl InjectBuildError {
pub fn new(scope: url::Url, err: impl Into<anyhow::Error>) -> InjectBuildError {
InjectBuildError(Some(tables::Error {
scope,
error: err.into(),
}))
}
}
impl FailBuild for InjectBuildError {
fn modify(&mut self, result: &mut UncommittedBuild) {
result.output.built.errors.insert(self.0.take().unwrap());
}
}

/// A wrapper around `PGControlPlane` that has a few basic capbilities for verifying
/// activation calls and simulating failures of activations and publications.
pub struct TestControlPlane {
Expand Down
43 changes: 41 additions & 2 deletions crates/agent/src/integration_tests/locking_retries.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use models::Id;
use models::{CatalogType, Id};
use uuid::Uuid;

use crate::{
integration_tests::harness::{draft_catalog, TestHarness},
integration_tests::harness::{draft_catalog, InjectBuildError, TestHarness},
publications::{DefaultRetryPolicy, JobStatus, LockFailure, RetryPolicy},
ControlPlane,
};
Expand Down Expand Up @@ -216,6 +216,45 @@ async fn test_publication_optimistic_locking_failures() {
)],
&expect_fail_result.status,
);

/// Assert that PublicationSuperseded and BuildSuperseded errors get retried
let capture_draft = draft_catalog(serde_json::json!({
"captures": {
"mice/capture": minimal_capture(None, &["mice/cheese", "mice/seeds"]),
}
}));
harness.control_plane().fail_next_build(
"mice/capture",
InjectBuildError::new(
tables::synthetic_scope(CatalogType::Capture, "mice/capture"),
validation::Error::BuildSuperseded {
build_id: Id::zero(),
larger_id: Id::zero(),
},
),
);
harness.control_plane().fail_next_build(
"mice/capture",
InjectBuildError::new(
tables::synthetic_scope(CatalogType::Capture, "mice/capture"),
validation::Error::PublicationSuperseded {
last_pub_id: Id::zero(),
pub_id: Id::zero(),
},
),
);
let result = harness
.control_plane()
.publish(
Some("test retry Superseded errors".to_string()),
Uuid::new_v4(),
capture_draft.clone_specs(),
)
.await
.unwrap()
.error_for_status()
.unwrap();
assert_eq!(2, result.retry_count);
}

async fn assert_last_pub_build(
Expand Down
24 changes: 24 additions & 0 deletions crates/agent/src/publications/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ impl RetryPolicy for DefaultRetryPolicy {
);
return false;
}

// Has there been an optimistic locking failure?
match &result.status {
JobStatus::BuildIdLockFailure { failures } => {
Expand All @@ -34,7 +35,30 @@ impl RetryPolicy for DefaultRetryPolicy {
);
true
}
JobStatus::BuildFailed { .. } if has_only_build_errors(result) => {
let retry = result.built.errors.iter().all(|err| {
match err.error.downcast_ref::<validation::Error>() {
Some(validation::Error::BuildSuperseded { .. }) => return true,
Some(validation::Error::PublicationSuperseded { .. }) => return true,
_ => false,
}
});
if retry {
tracing::info!(
retry_count = result.retry_count,
"will retry due to publication/build superseded error"
)
}
retry
}
_ => false,
}
}
}

fn has_only_build_errors(result: &PublicationResult) -> bool {
!result.built.errors.is_empty()
&& result.draft.errors.is_empty()
&& result.live.errors.is_empty()
&& result.test_errors.is_empty()
}

0 comments on commit c59397b

Please sign in to comment.