From 30cce7e6a2a480cc1dbf979f756c3ebc1c7f6e1d Mon Sep 17 00:00:00 2001 From: Phil Date: Wed, 25 Sep 2024 17:23:22 -0400 Subject: [PATCH] agent: retry on Build/PublicationSuperseded errors Updates the default publication retry policy to retry `BuildSuperseded` and `PublicationSuperseded` errors. This addresses a portion of #1520 --- crates/agent/src/integration_tests/harness.rs | 16 +++++++ .../src/integration_tests/locking_retries.rs | 43 ++++++++++++++++++- crates/agent/src/publications/retry.rs | 22 ++++++++++ 3 files changed, 79 insertions(+), 2 deletions(-) diff --git a/crates/agent/src/integration_tests/harness.rs b/crates/agent/src/integration_tests/harness.rs index 7721fb85ee..67e449601e 100644 --- a/crates/agent/src/integration_tests/harness.rs +++ b/crates/agent/src/integration_tests/harness.rs @@ -898,6 +898,22 @@ pub trait FailBuild: std::fmt::Debug + Send + 'static { fn modify(&mut self, result: &mut UncommittedBuild); } +#[derive(Debug)] +pub struct InjectBuildError(Option); +impl InjectBuildError { + pub fn new(scope: url::Url, err: impl Into) -> InjectBuildError { + InjectBuildError(Some(tables::Error { + scope, + error: err.into(), + })) + } +} +impl FailBuild for InjectBuildError { + fn modify(&mut self, result: &mut UncommittedBuild) { + result.output.built.errors.insert(self.0.take().unwrap()); + } +} + /// A wrapper around `PGControlPlane` that has a few basic capbilities for verifying /// activation calls and simulating failures of activations and publications. pub struct TestControlPlane { diff --git a/crates/agent/src/integration_tests/locking_retries.rs b/crates/agent/src/integration_tests/locking_retries.rs index 968dc0f7f3..f9cb54cba4 100644 --- a/crates/agent/src/integration_tests/locking_retries.rs +++ b/crates/agent/src/integration_tests/locking_retries.rs @@ -1,8 +1,8 @@ -use models::Id; +use models::{CatalogType, Id}; use uuid::Uuid; use crate::{ - integration_tests::harness::{draft_catalog, TestHarness}, + integration_tests::harness::{draft_catalog, InjectBuildError, TestHarness}, publications::{DefaultRetryPolicy, JobStatus, LockFailure, RetryPolicy}, ControlPlane, }; @@ -216,6 +216,45 @@ async fn test_publication_optimistic_locking_failures() { )], &expect_fail_result.status, ); + + /// Assert that PublicationSuperseded and BuildSuperseded errors get retried + let capture_draft = draft_catalog(serde_json::json!({ + "captures": { + "mice/capture": minimal_capture(None, &["mice/cheese", "mice/seeds"]), + } + })); + harness.control_plane().fail_next_build( + "mice/capture", + InjectBuildError::new( + tables::synthetic_scope(CatalogType::Capture, "mice/capture"), + validation::Error::BuildSuperseded { + build_id: Id::zero(), + larger_id: Id::zero(), + }, + ), + ); + harness.control_plane().fail_next_build( + "mice/capture", + InjectBuildError::new( + tables::synthetic_scope(CatalogType::Capture, "mice/capture"), + validation::Error::PublicationSuperseded { + last_pub_id: Id::zero(), + pub_id: Id::zero(), + }, + ), + ); + let result = harness + .control_plane() + .publish( + Some("test retry Superseded errors".to_string()), + Uuid::new_v4(), + capture_draft.clone_specs(), + ) + .await + .unwrap() + .error_for_status() + .unwrap(); + assert_eq!(2, result.retry_count); } async fn assert_last_pub_build( diff --git a/crates/agent/src/publications/retry.rs b/crates/agent/src/publications/retry.rs index 83242294ee..b2aa21e0a2 100644 --- a/crates/agent/src/publications/retry.rs +++ b/crates/agent/src/publications/retry.rs @@ -34,6 +34,28 @@ impl RetryPolicy for DefaultRetryPolicy { ); true } + JobStatus::BuildFailed { + incompatible_collections, + .. + } if incompatible_collections.is_empty() => { + let retry = result + .built + .errors + .iter() + .flat_map(|e| e.error.downcast_ref::()) + .all(|err| match err { + validation::Error::BuildSuperseded { .. } => return true, + validation::Error::PublicationSuperseded { .. } => return true, + _ => false, + }); + if retry { + tracing::info!( + retry_count = result.retry_count, + "will retry due to publication/build superseded error" + ) + } + retry + } _ => false, } }