Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stage: updated the MarkApplied so that table updates happen in batches #1033

Merged
merged 1 commit into from
Oct 4, 2024

Conversation

ryanluu12345
Copy link
Contributor

@ryanluu12345 ryanluu12345 commented Oct 2, 2024

Integrate the batches.Window functionality from util.Batches to split up the work for MarkApplied in order to address concerns with sending payloads that exceed the sql.conn.max_read_buffer_message_size.

The new logic handles the processing of the mutations inside of batches.Window and sets the default window size to 100,000.

Resolves: #995
Release Note: None

Testing

  • Hardcoded in a batch size of 1
  • Inserted in 4 rows
  • Verified that they get applied in individual batches
DEBUG  [Oct  1 17:45:13] MarkApplied: "_replicator"."public"."molt_public_tbl1"@{NO_FULL_SCAN} marked 1 mutations 
DEBUG  [Oct  1 17:45:15]                                               httpRequest="&{0x14000611c20 45 200 3 4.961584ms   false false}"
DEBUG  [Oct  1 17:45:15] upserted rows                                 conflicts=0 duration=1.053291ms proposed=1 target="\"molt\".\"public\".\"tbl1\"" upserted=1
DEBUG  [Oct  1 17:45:15] MarkApplied: "_replicator"."public"."molt_public_tbl1"@{NO_FULL_SCAN} marked 1 mutations 
DEBUG  [Oct  1 17:45:16] staged mutations                              count=1 duration=4.098209ms target="\"_replicator\".\"public\".\"molt_public_tbl1\"@{NO_FULL_SCAN}"
DEBUG  [Oct  1 17:45:16]                                               httpRequest="&{0x140006fc6c0 125 200 3 4.272833ms   false false}"
DEBUG  [Oct  1 17:45:18]                                               httpRequest="&{0x140006fcb40 45 200 3 3.637875ms   false false}"
DEBUG  [Oct  1 17:45:18] upserted rows                                 conflicts=0 duration="807.875µs" proposed=1 target="\"molt\".\"public\".\"tbl1\"" upserted=1
DEBUG  [Oct  1 17:45:18] MarkApplied: "_replicator"."public"."molt_public_tbl1"@{NO_FULL_SCAN} marked 1 mutations 
DEBUG  [Oct  1 17:45:21]                                               httpRequest="&{0x140006fd7a0 45 200 3 4.040834ms   false false}"
DEBUG  [Oct  1 17:45:21] upserted rows                                 conflicts=0 duration=1.278084ms proposed=1 target="\"molt\".\"public\".\"tbl1\"" upserted=1
DEBUG  [Oct  1 17:45:21] MarkApplied: "_replicator"."public"."molt_public_tbl1"@{NO_FULL_SCAN} marked 1 mutations 
DEBUG  [Oct  1 17:45:24]                                               httpRequest="&{0x140008198c0 45 200 3 4.716833ms   false false}"
DEBUG  [Oct  1 17:45:27]                                               httpRequest="&{0x1400061a120 45 200 3 5.514375ms   false false}"
  • Verified that the table ends up with mark applied
[email protected]:26257/_replicator> SELECT * FROM molt_public_tbl1;                                                                    
         nanos        | logical |     key     |                         mut                          | before | applied |          applied_at           | deletion |          source_time
----------------------+---------+-------------+------------------------------------------------------+--------+---------+-------------------------------+----------+--------------------------------
  1727829869833460550 |       0 | [1204568]   | \x7b226964223a313230343536382c2274223a22796f227d     | NULL   |    t    | 2024-10-02 00:44:36.032538+00 |    f     | 2024-10-02 00:44:29.83346+00
  1727829908772441943 |       0 | [1204560]   | \x7b226964223a313230343536302c2274223a22796f227d     | NULL   |    t    | 2024-10-02 00:45:13.415785+00 |    f     | 2024-10-02 00:45:08.772442+00
  1727829910926786528 |       0 | [12045601]  | \x7b226964223a31323034353630312c2274223a22796f227d   | NULL   |    t    | 2024-10-02 00:45:15.030812+00 |    f     | 2024-10-02 00:45:10.926787+00
  1727829913129158195 |       0 | [12045604]  | \x7b226964223a31323034353630342c2274223a22796f227d   | NULL   |    t    | 2024-10-02 00:45:18.032043+00 |    f     | 2024-10-02 00:45:13.129158+00
  1727829915248298738 |       0 | [120456046] | \x7b226964223a3132303435363034362c2274223a22796f227d | NULL   |    t    | 2024-10-02 00:45:21.031444+00 |    f     | 2024-10-02 00:45:15.248299+00
(5 rows)

This change is Reviewable

logical[idx] = mut.Time.Logical()
}

tag, err := db.Exec(ctx, s.sql.markApplied, keys, nanos, logical)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my read of the code db can be a tx if we want extraSanityChecks == true. This is why I don't do any rollback or TX committing here.

However, in the ticket, @bobvawter did mention that you wanted the caller of this to pass in a TX and create it if it doesn't exist. To clarify, did you want to change this logic so that it's unconditionally a transaction instead of potentially separate db.Exec calls?

I understand that it is good to keep this a transaction now that we update in batches because we want to update these atomically, just like we previously were technically. If that's the case, I can just make it unconditionally a transaction and make the extraSanityChecks merely check consistency, if you don't see issues with that.

Copy link
Member

@bobvawter bobvawter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 files reviewed, 4 unresolved discussions (waiting on @Jeremyyang920 and @ryanluu12345)


internal/staging/stage/stage.go line 554 at r1 (raw file):

Previously, ryanluu12345 wrote…

Open questions here:

  1. What is a good sane default? I seem to understand that millions to couple of millions is when we can start hitting the limits of the SQL cluster setting. But not sure if this is too low.
  2. Should this be configurable? I defaulted not since this is a property of the staging logic which shouldn't be managed by the user.

That seems like a reasonable default. It's far larger than any interactive OLTP workload would tend to generate. This change is to address bulk transactions.


internal/staging/stage/stage.go line 592 at r1 (raw file):

		}

		// Applies the mutations in batches to avoid exceeding the `sql.conn.max_read_buffer_message_size`

Reflow comments to 72 chars. The GoLand "Wrap to Column" plugin is my preferred tool.


internal/staging/stage/stage.go line 595 at r1 (raw file):

		// This also reduces the memory being used during this step in the case there are millions or more rows.
		if err := batches.Window(markAppliedBatchSize, len(muts), func(begin, end int) error {
			keys := make([]json.RawMessage, len(muts))

These slices are going to be full of empty values if windowing happens. The length should be end-begin.


internal/staging/stage/stage.go line 604 at r1 (raw file):

Previously, ryanluu12345 wrote…

From my read of the code db can be a tx if we want extraSanityChecks == true. This is why I don't do any rollback or TX committing here.

However, in the ticket, @bobvawter did mention that you wanted the caller of this to pass in a TX and create it if it doesn't exist. To clarify, did you want to change this logic so that it's unconditionally a transaction instead of potentially separate db.Exec calls?

I understand that it is good to keep this a transaction now that we update in batches because we want to update these atomically, just like we previously were technically. If that's the case, I can just make it unconditionally a transaction and make the extraSanityChecks merely check consistency, if you don't see issues with that.

There's also
https://github.com/cockroachdb/replicator/blob/master/internal/sequencer/decorators/marker.go
and a handful of other places where a staging transaction may be created when multiple tables are in play. The thing to do here is to ensure there's a transaction if the number of marks exceeds the window size. We want to be able to use implicit transactions, unless there's a need to do something else.

Copy link
Contributor Author

@ryanluu12345 ryanluu12345 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 files reviewed, 4 unresolved discussions (waiting on @bobvawter and @Jeremyyang920)


internal/staging/stage/stage.go line 554 at r1 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

That seems like a reasonable default. It's far larger than any interactive OLTP workload would tend to generate. This change is to address bulk transactions.

Ok great. Then I'll go ahead and leave it at this default. If configuration is needed later, we can drill it in, but unnecessary for now.


internal/staging/stage/stage.go line 592 at r1 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Reflow comments to 72 chars. The GoLand "Wrap to Column" plugin is my preferred tool.

Noted, just installed Rewrap and will use that for my comment formatting. Note that I'll use crlfmt for my coding needs.


internal/staging/stage/stage.go line 595 at r1 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

These slices are going to be full of empty values if windowing happens. The length should be end-begin.

Great catch!

Copy link
Contributor Author

@ryanluu12345 ryanluu12345 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 1 files reviewed, 4 unresolved discussions (waiting on @bobvawter and @Jeremyyang920)


internal/staging/stage/stage.go line 604 at r1 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

There's also
https://github.com/cockroachdb/replicator/blob/master/internal/sequencer/decorators/marker.go
and a handful of other places where a staging transaction may be created when multiple tables are in play. The thing to do here is to ensure there's a transaction if the number of marks exceeds the window size. We want to be able to use implicit transactions, unless there's a need to do something else.

Ok ended up fixing this! Please let me know your thoughts. I bundled this logic with the block above that creates a transaction for the extraSanityChecks. The logic here is that we need a new transaction in the two cases where we want to do consistency checks or we want to do multiple batches of applies but do so atomically.

Copy link
Member

@bobvawter bobvawter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation looks good, but it needs a test before merging.

Reviewed all commit messages.
Reviewable status: 0 of 1 files reviewed, 1 unresolved discussion (waiting on @Jeremyyang920 and @ryanluu12345)


internal/staging/stage/stage.go line 604 at r1 (raw file):

Previously, ryanluu12345 wrote…

Ok ended up fixing this! Please let me know your thoughts. I bundled this logic with the block above that creates a transaction for the extraSanityChecks. The logic here is that we need a new transaction in the two cases where we want to do consistency checks or we want to do multiple batches of applies but do so atomically.

That makes sense.


internal/staging/stage/stage.go line 619 at r3 (raw file):

			}

			log.Debugf("MarkApplied: %s marked %d mutations", s.stage, tag.RowsAffected())

Drop this back to trace; it's going to be very spammy in production.

Copy link
Contributor Author

@ryanluu12345 ryanluu12345 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought is to put another test case for func TestPutAndDrain(t \*testing.T) { where the batch size is set to a very small number (like 1) and verify that it properly marks applied for all the data. Does this sound reasonable or were you thinking a more integration or e2e test?

Reviewable status: 0 of 1 files reviewed, 1 unresolved discussion (waiting on @bobvawter and @Jeremyyang920)


internal/staging/stage/stage.go line 619 at r3 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Drop this back to trace; it's going to be very spammy in production.

Done.

Copy link
Contributor Author

@ryanluu12345 ryanluu12345 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished the tests. PR should be ready!

Reviewable status: 0 of 2 files reviewed, 1 unresolved discussion (waiting on @bobvawter and @Jeremyyang920)

Copy link
Member

@bobvawter bobvawter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: w/ nits

Reviewed 2 of 2 files at r4, all commit messages.
Reviewable status: all files reviewed, 3 unresolved discussions (waiting on @Jeremyyang920 and @ryanluu12345)


internal/staging/stage/stage.go line 560 at r4 (raw file):

}

// These implementations are used for testing and help to set the mark applied

SetMarkAppliedBatchSize is used for testing ....


internal/staging/stage/stage.go line 566 at r4 (raw file):

}

func (s *stage) GetMarkAppliedBatchSize() int {

This method is unused.


internal/staging/stage/stage.go line 583 at r4 (raw file):

	ctx context.Context, db types.StagingQuerier, muts []types.Mutation,
) error {
	fmt.Println(s.markAppliedBatchSize)

Dangling log statement.

Integrate the batches.Window functionality from `util.Batches` to split up the
work for MarkApplied in order to address concerns with sending payloads that
exceed the sql.conn.max_read_buffer_message_size.

The new logic handles the processing of the mutations inside of batches.Window
and sets the default window size to 100,000.

Resolves: #995
Release Note: None
Copy link
Contributor Author

@ryanluu12345 ryanluu12345 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewable status: all files reviewed, 3 unresolved discussions (waiting on @bobvawter and @Jeremyyang920)


internal/staging/stage/stage.go line 560 at r4 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

SetMarkAppliedBatchSize is used for testing ....

done


internal/staging/stage/stage.go line 566 at r4 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

This method is unused.

done


internal/staging/stage/stage.go line 583 at r4 (raw file):

Previously, bobvawter (Bob Vawter) wrote…

Dangling log statement.

done

@ryanluu12345 ryanluu12345 added this pull request to the merge queue Oct 4, 2024
Merged via the queue into master with commit 2bce2c1 Oct 4, 2024
51 of 52 checks passed
@ryanluu12345 ryanluu12345 deleted the user/rluu/batch-mark-apply branch October 4, 2024 18:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

MarkApplied can send over-large payloads
2 participants