Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update cdc/server/integration_test.go with workload.Checker #906

Open
bobvawter opened this issue Jun 7, 2024 · 7 comments
Open

Update cdc/server/integration_test.go with workload.Checker #906

bobvawter opened this issue Jun 7, 2024 · 7 comments
Labels
good first issue Good for newcomers

Comments

@bobvawter
Copy link
Member

This is a pretty old test and could stand to be refreshed with the newer workload.Checker.

@bobvawter bobvawter added the good first issue Good for newcomers label Jun 7, 2024
@ryanluu12345
Copy link
Contributor

Is this the intended scope of this change to rewrite this test so that it follows the same Workload convention in internal/source/kafka/integration_test.go ? Basically, rewriting it so that it does all aspects of the test with the workload struct helpers? Or is it just to use the workload.Checker for verifying the results are reasonable?

@bobvawter
Copy link
Member Author

If not rewrite, then to add another test method that inserts into the source tables using data derived from the generator and then validate the target tables with the checker.

@ryanluu12345
Copy link
Contributor

ryanluu12345 commented Oct 2, 2024

Ok sounds like a plan. After digging a bit deeper into the code today, I think I have a general understanding of the workflow here:

  1. Instantiate a new all.Fixture which gives us access to the NewWorkload helper that creates a workload on our behalf
  2. Get the configuration for the type of replicator mode we are trying to run (in this case CDC)
  3. Create a new "producer" which is essentially what writes the data to the source (also thinking in the case of CDC, we can start the changefeed job here for our workload table)
  4. Start the replicator service
  5. Inside of a loop, generate batches into a multi batch and then later on write the batch we generate using the "producer" from above
  6. Use the workload checker to wait for catch up and then check all counters + target data matching source data

Please let me know if I'm misunderstanding any of these steps.

I do have a few open questions:

  1. In the case of the CDC case, the "producer" isn't a message queue like kafka or object store like s3, but rather the source database. Is it a correct assumption that we need to create a new producer that has access to the sourceFixture and can create the changefeed that ends up sending data to the target?
  2. I see that in the other cases of kafka and objstore, there is a Start method that gets called. Looking into source/cdc/server I don't see an equivalent, but I do see the newTestFixture that seems to do the same thing: wire everything up and return a server. Would this be the one I use in place of the Start That I see in the other examples. Code from the kafka example for reference:
	serverCfg, err := getConfig(destFixture, fc, []string{target.Raw()}, target)
	r.NoError(err)
	timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
	defer cancel()
	connCtx := stopper.WithContext(timeoutCtx)
	kafka, err := Start(connCtx, serverCfg)
	r.NoError(err)

  1. If the answer to the above question is true, then is it correct to assume that the replicator server that is started is handled by this newTextFixture and all else that's needed is to seed the source DB and start the changefeed on the source?
  2. Also is this piece of code still relevant? I don't seem to see a conveyor attribute on the returned server struct we get from the newTestFixture call. I know it's important for stats which is how we know the workload catches up. Wondering if there is a better way to determine this in the context of the ProvideServer.
	conn, cancel, err := newTestFixture(connCtx, serverCfg)
	defer cancel()
	r.NoError(err)
	stats := test.conveyor.(interface {
		Stat() *notify.Var[sequencer.Stat]
	}).Stat()

@bobvawter
Copy link
Member Author

https://github.com/cockroachdb/replicator/blob/bob_core_open/internal/source/pglogical/integration_test.go#L101-L279 is what I'm thinking. The details of setting up the replication stream are going to be different (changefeed vs. pglogical), but the overall flow of setting up the fixture and the meat of the test are going to be the same.

Answers to your questions:

  1. Yes.
  2. It was more convenient to have the injector create a running server than to have a separate Start method. There's no specific reason it must be this way.
  3. Correct. The server created by the test injector will be ready to accept an incoming changefeed.
  4. You'd still need to get access to the Stat to know when the replication channel has caught up.

@ryanluu12345
Copy link
Contributor

Thanks for answering questions 1 to 3. This already gives me a great idea of how to start. Regarding item 4, would you be fine if I expose a stat field on that struct for newTestFixture so that I can just assign the sequencer stat there and have access to it outside of the test?

@bobvawter
Copy link
Member Author

There's no single stat variable to export from the injector. It's an instance within the Conveyor associated with the target schema. You can grab it as follows:

https://github.com/cockroachdb/replicator/blob/master/internal/source/cdc/handler_test.go#L825-L828

@ryanluu12345
Copy link
Contributor

Spoke offline with @bobvawter on how to get the mutations to the source database to begin with:

We can use: base.Fixture.Swapped and all.NewFixtureFromBase. This can let us get the mutations generated from the workload generator and translate them into a format the source can interpret.

Setup:
https://github.com/cockroachdb/replicator/blob/bob_core_open/internal/source/pglogical/integration_test.go#L174-L179
Insert:
https://github.com/cockroachdb/replicator/blob/bob_core_open/internal/source/pglogical/integration_test.go#L233-L246
Validate:
https://github.com/cockroachdb/replicator/blob/bob_core_open/internal/source/pglogical/integration_test.go#L271-L272

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Good for newcomers
Projects
None yet
Development

No branches or pull requests

2 participants