Skip to content

Commit

Permalink
agent: refactor Publisher API
Browse files Browse the repository at this point in the history
Significantly refactors the `Publisher`, in order to address a number of
outstanding issues. Introduces a `DraftPublication` struct that
represents the desire to publish a draft, along with associated metadata
and configuration. Also added the `Publisher::publish(&self, DraftPublication)`
function, which handles build, commit, and retries.

Fixes #1634 by bypassing authorization checks for controller-initiated
publications. The `verify_user_authz` field of `DraftPublication` can be
used to toggle this behavior.

Takes care of a portion of #1520 by generating a new publication id for
each attempted publication, rather than re-using the `publications.id`
column value. This required adding a new `publications.pub_id` column
to hold the effective publication id, since `publications.id` can no
longer be used to join to `publication_specs` or
`live_specs.last_pub_id`.
  • Loading branch information
psFried committed Sep 26, 2024
1 parent b9968f0 commit f61cb09
Show file tree
Hide file tree
Showing 20 changed files with 658 additions and 391 deletions.
5 changes: 4 additions & 1 deletion crates/agent-sql/src/publications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ pub async fn dequeue(
pub async fn resolve<S>(
id: Id,
status: &S,
final_pub_id: Option<Id>,
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> sqlx::Result<()>
where
Expand All @@ -274,12 +275,14 @@ where
sqlx::query!(
r#"update publications set
job_status = $2,
updated_at = clock_timestamp()
updated_at = clock_timestamp(),
pub_id = $3
where id = $1
returning 1 as "must_exist";
"#,
id as Id,
Json(status) as Json<&S>,
final_pub_id as Option<Id>,
)
.fetch_one(txn)
.await?;
Expand Down
42 changes: 19 additions & 23 deletions crates/agent/src/api/create_data_plane.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::publications::{DoNotRetry, DraftPublication, NoExpansion, PruneUnboundCollections};

use super::App;
use anyhow::Context;
use regex::bytes::NoExpand;
use std::sync::Arc;
use validator::Validate;

Expand Down Expand Up @@ -184,30 +187,23 @@ async fn do_create_data_plane(
let draft: tables::DraftCatalog = serde_json::from_str::<models::Catalog>(&draft_str)
.unwrap()
.into();

let pub_id = id_generator.lock().unwrap().next();
let built = publisher
.build(
user_id,
pub_id,
Some(format!("publication for data-plane {base_name}")),
draft,
insert.logs_token,
&data_plane_name,
)
.await?;

if built.has_errors() {
for err in built.output.errors() {
tracing::error!(scope=%err.scope, err=format!("{:#}", err.error), "data-plane-template build error")
}
anyhow::bail!("data-plane-template build failed");
}

_ = publisher
.commit(built)
let publication = DraftPublication {
user_id,
logs_token: insert.logs_token,
draft,
dry_run: false,
detail: Some(format!("publication for data-plane {base_name}")),
// TODO: Should we verify user authz for publications to create a data plane?
verify_user_authz: true,
default_data_plane_name: Some(data_plane_name.clone()),
initialize: NoExpansion,
finalize: PruneUnboundCollections,
retry: DoNotRetry,
};
publisher
.publish(publication)
.await
.context("committing publication")?
.context("publishing ops catalog")?
.error_for_status()?;

tracing::info!(
Expand Down
60 changes: 25 additions & 35 deletions crates/agent/src/api/update_l2_reporting.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::publications::{DoNotRetry, DraftPublication, NoExpansion, NoopFinalize};

use super::App;
use anyhow::Context;
use std::sync::Arc;
Expand All @@ -18,15 +20,12 @@ pub struct Response {
}

#[tracing::instrument(
skip(pg_pool, publisher, id_generator),
skip(pg_pool, publisher),
err(level = tracing::Level::WARN),
)]
async fn do_update_l2_reporting(
App {
pg_pool,
publisher,
id_generator,
..
pg_pool, publisher, ..
}: &App,
super::ControlClaims { sub: user_id, .. }: super::ControlClaims,
Request {
Expand Down Expand Up @@ -174,41 +173,32 @@ export class Derivation extends Types.IDerivation {"#
..Default::default()
};

let pub_id = id_generator.lock().unwrap().next();
let logs_token = uuid::Uuid::new_v4();

let uncommitted = publisher
.build(
user_id,
pub_id,
Some(format!("publication for updating L2 reporting")),
draft,
logs_token,
&default_data_plane,
)
.await?;

if uncommitted.has_errors() {
for err in uncommitted.output.errors() {
tracing::error!(scope=%err.scope, err=format!("{:#}", err.error), "data-plane-template build error")
let publication = DraftPublication {
user_id,
logs_token,
draft,
dry_run,
detail: Some(format!("publication for updating L2 reporting")),
default_data_plane_name: Some(default_data_plane.clone()),
// TODO: should we verify user authz for updating L2 reporting?
verify_user_authz: true,
initialize: NoExpansion,
finalize: NoopFinalize,
retry: DoNotRetry,
};
let result = publisher
.publish(publication)
.await
.context("publishing L2 reporting catalog")?;
if !result.status.is_success() {
for err in result.draft_errors() {
tracing::error!(error = ?err, "data-plane-template build error");
}
anyhow::bail!("data-plane-template build failed");
}

let (live, draft) = if !dry_run {
let published = publisher
.commit(uncommitted)
.await
.context("committing publication")?
.error_for_status()?;

(published.live.collections, published.draft.collections)
} else {
(
uncommitted.output.live.collections,
uncommitted.output.draft.collections,
)
};
let (live, draft) = (result.live.collections, result.draft.collections);
tracing::info!(%logs_token, %dry_run, "updated L2 reporting");

let previous = serde_json::json!({
Expand Down
55 changes: 19 additions & 36 deletions crates/agent/src/controlplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use serde_json::value::RawValue;
use sqlx::types::Uuid;
use std::{collections::BTreeSet, ops::Deref};

use crate::publications::{JobStatus, PublicationResult, Publisher};
use crate::publications::{
DefaultRetryPolicy, DraftPublication, JobStatus, NoExpansion, NoopFinalize, PublicationResult,
Publisher,
};

macro_rules! unwrap_single {
($catalog:expr; expect $expected:ident not $( $unexpected:ident ),+) => {
Expand Down Expand Up @@ -350,41 +353,21 @@ impl ControlPlane for PGControlPlane {
logs_token: Uuid,
draft: tables::DraftCatalog,
) -> anyhow::Result<PublicationResult> {
let mut maybe_draft = Some(draft);
let mut attempt = 0;
loop {
let draft = maybe_draft.take().expect("draft must be Some");
attempt += 1;
let publication_id = self.id_generator.next();
let built = self
.publications_handler
.build(
self.system_user_id,
publication_id,
detail.clone(),
draft,
logs_token,
"", // No default data-plane.
)
.await?;
if built.errors().next().is_some() {
return Ok(built.build_failed());
}
let commit_result = self.publications_handler.commit(built).await?;

// Has there been an optimistic locking failure?
let JobStatus::BuildIdLockFailure { failures } = &commit_result.status else {
// All other statuses are terminal.
return Ok(commit_result);
};
if attempt == Publisher::MAX_OPTIMISTIC_LOCKING_RETRIES {
tracing::error!(%attempt, ?failures, "giving up after maximum number of optimistic locking retries");
return Ok(commit_result);
} else {
tracing::info!(%attempt, ?failures, "publish failed due to optimistic locking failure (will retry)");
maybe_draft = Some(commit_result.draft);
}
}
let publication = DraftPublication {
user_id: self.system_user_id,
logs_token,
draft,
detail,
dry_run: false,
// Controllers don't publish new specs currently, so this is not needed
default_data_plane_name: None,
// skip authz checks for controller-initiated publications
verify_user_authz: false,
initialize: NoExpansion,
finalize: NoopFinalize,
retry: DefaultRetryPolicy,
};
self.publications_handler.publish(publication).await
}

async fn data_plane_activate(
Expand Down
Loading

0 comments on commit f61cb09

Please sign in to comment.