Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Dekaf control-plane components #1665

Open
wants to merge 19 commits into
base: master
Choose a base branch
from

Conversation

jshearer
Copy link
Contributor

@jshearer jshearer commented Sep 30, 2024

This PR builds ontop of #1642, but is different enough that I didn't want to push to that PR after it was already reviewed. It is the first couple parts of #1622. We have:

  • The new endpoint type
        endpoint:
          dekaf:
            variant: some-dekaf-variant
            config:
              token: "foo"
  • Dekaf implements a stripped down materialize_unary that is/will be responsible for validating these endpoint/resource configs.
    • TODO: Figure out how to expose the schemars schema for DekafConfig and DekafResourceConfig. I'm pretty sure it looks like teaching dekaf::materialize_unary about the spec RPC, but not 100% sure.
      • Does this mean that we'll need to create some sort of fake connector_tags row and set it to discover/cache the schema? That's probably possible, but probably also won't just work OOTB.
    • It appears that there are two separate places where this needs to happen, so I did it in both. Is that true?
      • agent::ProxyConnectors
      • runtime::Runtime.
  • New scenario tests that cover dekaf-type materializations, indirection of config, etc.
  • Moved the flowctl::Client logic to a new crate: flow_client that both flowctl and dekaf can depend on
  • A backwards-compatible update to the Dekaf service that allows you to pass in the name of a materialization as username, and the corresponding bearer token as a password
    • TODO: This is stubbed out at the point where we need to validate the token -- need to figure out how to actually fetch the latest built spec for a particular task name, extract its config, unseal it, etc.
  • Change Dekaf behavior if it's provided a task name/valid token
    • TODO: Metadata/discovery: Return bindings for this task instead of all collections a user has grants to
    • TODO: Schema registry: For /subjects, return materialization bindings instead of all collections
    • TODO: Self-sign data-plane access token in order to actually read data when not provided a refresh token

I think the above TODOs should be resolved by a follow-up PR in order to get this new materialization type to the UI team ASAP.


This change is Reviewable

@jshearer jshearer added the change:planned This is a planned change label Sep 30, 2024
@jshearer jshearer force-pushed the jshearer/dekaf_resource_config branch 2 times, most recently from c82e06c to 56c04af Compare September 30, 2024 14:56
Copy link
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good! Some comments for you.

crates/sources/src/indirect.rs Outdated Show resolved Hide resolved
crates/dekaf/src/connector.rs Show resolved Hide resolved
crates/flow-client/src/client.rs Outdated Show resolved Hide resolved
}
}

pub fn pg_client(&self) -> postgrest::Postgrest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a down-side to re-building the client from scratch every time: we will no longer re-use the underlying connection pool, and will re-dial connections every time.

That's the primary reason that the refresh flow lived outside of this Client implementation, and resulted in a new Client being built.

I think it'd be better to have the refresh routine be a free function in this crate that operates on a Client, and perhaps to provide a garden path for how to build a next Client after performing such a refresh, but to keep it a bit more arms-length.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a down-side to re-building the client from scratch every time: we will no longer re-use the underlying connection pool, and will re-dial connections every time.

Riiight, yeah. So what if Client kept around a single Postgrest client that it hands out whenever somebody calls pg_client() (postgrest::Client is a thin wrapper around an Arc<InnerClient> so it can be shared easily), and which only gets recreated upon refresh. In fact, we could even check if a refresh is needed inside the call to pg_client() so you don't even need to think about it: any time you call .pg_client(), you'll get a client with valid access credentials.

Ideally I'd love to wrap Postgrest in a wrapper that checks and auto-refreshes its access token every time you make an API call. That way it could modify its own header map and not need to get re-created at all. But I think my proposed solution above is a decent compromise

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's far too much coupling between Client, which we should seek to keep very un-opinionated, with a highly opinionated refresh flow.

For example, refresh() will create a refresh token if it doesn't already have one, even if there's a perfectly valid access token available. Is that appropriate for flowctl? I think "yes", though even that is possibly debatable. Is it always appropriate? Definitely not.

Or what if we introduce a new mechanism for refresh tokens? We easily could -- we're currently ripping out gateway_auth_token after all.

I don't think Client should know anything about the manner in which tokens are obtained, and that that business logic belongs in free function(s) that operate on a Client.

Copy link
Contributor Author

@jshearer jshearer Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this what you meant? I think I got a bit lost between flow_client::Client and postgrest::Postgrest when reading your comments, but this should:

  • Move opinionated refresh functionality to bare function to avoid coupling flow_client::Client to specific token refresh logic
  • Create a single postgrest::Postgrest at flow_client::Client creation time and hand out clones of it with auth headers set to the current access token upon request. This should allow all users of pg_client() to share a connection pool, while still allowing flow_client::Client to hand out postgrest::Postgrest instances to all of the places that need them

Copy link
Member

@jgraettinger jgraettinger Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Back up a step, and look at how flowctl on master updates it's configuration and then writes it out on exit. This change, as is, breaks that workflow because token refresh is treated as in interior detail of flow_client::Client, and I'm saying: it shouldn't be.

flow_client::Client should be immutable exactly because of the kind of bug which currently exists in flowctl within this branch.

This is how I think it should be structured:

  • flow_client::Client, once created, is immutable. We don't do any interior mutation, ever.
  • Have a flow_client::refresh_authorizations(client: &Client, access_token: Option<String>, refresh_token: Option<RefreshToken>) -> anyhow::Result<(String, RefreshToken)> routine which takes an optional access and/or refresh token, and returns a guaranteed access and refresh token (or an error).
    • This routine should be the only place we parse the access token and do an expiration check -- currently this logic is also still duplicated in flowctl/lib.rs.
    • Feel free to implement a further subroutine for getting an access token from a refresh token, if that's useful for dekaf.
  • Use this method in flowctl to update the configured access and refresh token, and to then re-build the client, as it had been doing prior to this PR.

IIRC there isn't a reason for a flow_client::Client::claims() routine aside from checking expiration, but I might be missing something. In any case, IMO this too should be a free function if it's still needed and Client should not have an opinion on the shape of an access token.

Copy link
Contributor Author

@jshearer jshearer Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the problem, and why the raw access/refresh tokens need to be exposed -- in order for them to get set on flowctl::Config + written back to disk. I believe this fixes flowctl writing updated creds back to disk. I tested it locally by removing user_access_token from my local.json profile, ran cargo run -p flowctl auth roles list and it wrote back a new access token

crates/flowctl/src/config.rs Outdated Show resolved Hide resolved
crates/runtime/src/unary.rs Outdated Show resolved Hide resolved
crates/sources/src/loader.rs Outdated Show resolved Hide resolved
crates/validation/tests/scenario_tests.rs Outdated Show resolved Hide resolved
@jshearer
Copy link
Contributor Author

jshearer commented Oct 1, 2024

Fixes #1657

@jshearer jshearer force-pushed the jshearer/dekaf_resource_config branch from 29145d8 to 6b056cb Compare October 1, 2024 15:32
A materialization of this kind is purely descriptive, and cannot be directly started. It is used to convey the intent to expose a set of bindings through Dekaf, which will look it up to determine things like binding names and field selection/projections.
…very journals, but they _should_ get ops stats+logs collections
@jshearer jshearer force-pushed the jshearer/dekaf_resource_config branch from a69e659 to e78de08 Compare October 2, 2024 19:21
@jshearer jshearer force-pushed the jshearer/dekaf_resource_config branch from e78de08 to d0f5ba7 Compare October 2, 2024 19:37
@jshearer
Copy link
Contributor Author

jshearer commented Oct 2, 2024

@jgraettinger re-requested your review. I believe I've addressed everything, except #1665 (comment) if I didn't understand exactly what you wanted there.

`flowctl` needs these credentials exposed in order to store them in `flowctl::Config` + on disk for the next flowctl invocation
@jshearer jshearer force-pushed the jshearer/dekaf_resource_config branch from f9fcd6c to 14c50a8 Compare October 3, 2024 21:07
@jshearer jshearer force-pushed the jshearer/dekaf_resource_config branch from 232af77 to d55ecee Compare October 4, 2024 14:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
change:planned This is a planned change
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants