Skip to content

Commit

Permalink
agent: retry on Build/PublicationSuperseded errors
Browse files Browse the repository at this point in the history
Updates the default publication retry policy to retry `BuildSuperseded`
and `PublicationSuperseded` errors. This addresses a portion of #1520
  • Loading branch information
psFried committed Sep 26, 2024
1 parent f61cb09 commit 30cce7e
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 2 deletions.
16 changes: 16 additions & 0 deletions crates/agent/src/integration_tests/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,22 @@ pub trait FailBuild: std::fmt::Debug + Send + 'static {
fn modify(&mut self, result: &mut UncommittedBuild);
}

#[derive(Debug)]
pub struct InjectBuildError(Option<tables::Error>);
impl InjectBuildError {
pub fn new(scope: url::Url, err: impl Into<anyhow::Error>) -> InjectBuildError {
InjectBuildError(Some(tables::Error {
scope,
error: err.into(),
}))
}
}
impl FailBuild for InjectBuildError {
fn modify(&mut self, result: &mut UncommittedBuild) {
result.output.built.errors.insert(self.0.take().unwrap());
}
}

/// A wrapper around `PGControlPlane` that has a few basic capbilities for verifying
/// activation calls and simulating failures of activations and publications.
pub struct TestControlPlane {
Expand Down
43 changes: 41 additions & 2 deletions crates/agent/src/integration_tests/locking_retries.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use models::Id;
use models::{CatalogType, Id};
use uuid::Uuid;

use crate::{
integration_tests::harness::{draft_catalog, TestHarness},
integration_tests::harness::{draft_catalog, InjectBuildError, TestHarness},
publications::{DefaultRetryPolicy, JobStatus, LockFailure, RetryPolicy},
ControlPlane,
};
Expand Down Expand Up @@ -216,6 +216,45 @@ async fn test_publication_optimistic_locking_failures() {
)],
&expect_fail_result.status,
);

/// Assert that PublicationSuperseded and BuildSuperseded errors get retried
let capture_draft = draft_catalog(serde_json::json!({
"captures": {
"mice/capture": minimal_capture(None, &["mice/cheese", "mice/seeds"]),
}
}));
harness.control_plane().fail_next_build(
"mice/capture",
InjectBuildError::new(
tables::synthetic_scope(CatalogType::Capture, "mice/capture"),
validation::Error::BuildSuperseded {
build_id: Id::zero(),
larger_id: Id::zero(),
},
),
);
harness.control_plane().fail_next_build(
"mice/capture",
InjectBuildError::new(
tables::synthetic_scope(CatalogType::Capture, "mice/capture"),
validation::Error::PublicationSuperseded {
last_pub_id: Id::zero(),
pub_id: Id::zero(),
},
),
);
let result = harness
.control_plane()
.publish(
Some("test retry Superseded errors".to_string()),
Uuid::new_v4(),
capture_draft.clone_specs(),
)
.await
.unwrap()
.error_for_status()
.unwrap();
assert_eq!(2, result.retry_count);
}

async fn assert_last_pub_build(
Expand Down
22 changes: 22 additions & 0 deletions crates/agent/src/publications/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,28 @@ impl RetryPolicy for DefaultRetryPolicy {
);
true
}
JobStatus::BuildFailed {
incompatible_collections,
..
} if incompatible_collections.is_empty() => {
let retry = result
.built
.errors
.iter()
.flat_map(|e| e.error.downcast_ref::<validation::Error>())
.all(|err| match err {
validation::Error::BuildSuperseded { .. } => return true,
validation::Error::PublicationSuperseded { .. } => return true,
_ => false,
});
if retry {
tracing::info!(
retry_count = result.retry_count,
"will retry due to publication/build superseded error"
)
}
retry
}
_ => false,
}
}
Expand Down

0 comments on commit 30cce7e

Please sign in to comment.