Skip to content

Commit

Permalink
derivation_previews: new table and job
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Jul 13, 2023
1 parent e106494 commit 04441eb
Show file tree
Hide file tree
Showing 6 changed files with 440 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

62 changes: 62 additions & 0 deletions crates/agent-sql/src/derivation_previews.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use super::{CatalogType, Id, TextJson as Json};
use chrono::prelude::*;
use serde::Serialize;
use serde_json::value::RawValue;
use sqlx::types::Uuid;

// Row is the dequeued task shape of a discover operation.
#[derive(Debug)]
pub struct Row {
pub created_at: DateTime<Utc>,
pub draft_id: Id,
pub id: Id,
pub logs_token: Uuid,
pub updated_at: DateTime<Utc>,
pub num_documents: usize,
pub collection_name: String,
}

pub async fn dequeue(txn: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> sqlx::Result<Option<Row>> {
sqlx::query_as!(
Row,
r#"select
derivation_previews.created_at,
derivation_previews.draft_id as "draft_id: Id",
derivation_previews.id as "id: Id",
derivation_previews.logs_token,
derivation_previews.updated_at,
derivation_previews.num_documents,
derivation_previews.collection_name,
from derivation_previews
where derivation_previews.job_status->>'type' = 'queued'
order by derivation_previews.id asc
limit 1
for update of derivation_previews skip locked;
"#
)
.fetch_optional(txn).await
}

pub async fn resolve<S>(
id: Id,
status: S,
txn: &mut sqlx::Transaction<'_, sqlx::Postgres>,
) -> sqlx::Result<()>
where
S: Serialize + Send + Sync,
{
sqlx::query!(
r#"update derivation_previews set
job_status = $2,
updated_at = clock_timestamp()
where id = $1
returning 1 as "must_exist";
"#,
id as Id,
Json(status) as Json<S>,
)
.fetch_one(txn)
.await?;

Ok(())
}
2 changes: 2 additions & 0 deletions crates/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tables = { path = "../tables", features = ["persist"] }
anyhow = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
bytelines = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
Expand All @@ -35,6 +36,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true }
tempfile = { workspace = true }
tempdir = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
Expand Down
Loading

0 comments on commit 04441eb

Please sign in to comment.