Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Potential pattern of ignoring stranded RPC workers #5127

Open
chewbranca opened this issue Jul 11, 2024 · 4 comments
Open

Potential pattern of ignoring stranded RPC workers #5127

chewbranca opened this issue Jul 11, 2024 · 4 comments

Comments

@chewbranca
Copy link
Contributor

While trying to understand why we'd encounter rexi:init_stream errors in #5122 I believe I've identified a pattern present in at least four of the fabric RPC related modules. I think fabric_view_all_docs.erl is a relatively straightforward representation of the issue, so I'm going to dissect the flow from there.

Step 1) Instantiate RPC workers

We first create a set of RPC workers on the remote nodes as specified in Shards. This creates the handle Workers0 with a set of references to all instantiated RPC workers.

    Workers0 = fabric_util:submit_jobs(
        Shards, fabric_rpc, all_docs, [Options, WorkerArgs]
    ),

Step 2) create a set of monitors for all remote nodes

    RexiMon = fabric_util:create_monitors(Workers0),

This creates a set of monitors on the relevant remote rexi processes for each of the nodes in question, not the workers themselves:

create_monitors(Shards) ->
    MonRefs = lists:usort([rexi_utils:server_pid(N) || #shard{node = N} <- Shards]),
    rexi_monitor:start(MonRefs).

Step 3 handle fabric_streams:start in a try ... after .... end block

    try
        case fabric_streams:start(Workers0, #shard.ref, RingOpts) of
            ...
        end
    after
        rexi_monitor:stop(RexiMon)
    end

This invokes fabric_streams:start in a try block so that after we invoke rexi_monitor:stop(RexiMon) to clear out the monitors.

Step 4) handle the inner case clauses of Step 3)

First off we have the successful case when the stream has been initialized:

            {ok, Workers} ->
                try
                    go(DbName, Options, Workers, CoordArgs, Callback, Acc)
                after
                    fabric_streams:cleanup(Workers)
                end;

The key thing of note here is that this clause performs a fabric_streams:cleanup(Workers) in the after clause of a try block to ensure the remote workers are cleaned up after the job is done.

However, the cleanup is performed against the subset of workers selected to perform the job in Workers, not the original full set of RPC workers instantiated and stored in Workers0.

Next we have the two failure cases for this fabric operation. I'll lump them together as their behavior is identical:

            {timeout, NewState} ->
                DefunctWorkers = fabric_util:remove_done_workers(
                    NewState#stream_acc.workers, waiting
                ),
                fabric_util:log_timeout(
                    DefunctWorkers,
                    "all_docs"
                ),
                Callback({error, timeout}, Acc);
            {error, Error} ->
                Callback({error, Error}, Acc)

Both of these failure clauses bubble up the error through the caller provided Callback, however, neither performs any cleanup of the workers. In the outer after clause we do a rexi_monitor:stop(RexiMon) but that's basically a no-op to kill the dedicated monitoring process.

Core Issue

I think there are two things going on here we need to address:

  1. RPC workers are not cleaned up at all upon fabric_streams:start error modes

I think this is fairly straightforward here, we should always ensure workers are cleaned up, especially when failures happen. Basically I think we should do a fabric_streams:cleanup on the workers in the outer after clause so they're always cleaned up.

  1. when we do call fabric_streams:cleanup(Workers) it's on Workers instead of Workers0

This might be a bit more controversial, but I suspect one of the ways in which #5122 manifests is because we're not diligent about canceling RPC workers. It's possible that fabric_streams:cleanup(Workers) is sufficient, but I think fabric_streams:cleanup(Workers0) against the full original set of workers is appropriate.

  1. bonus item: we should consider moving the cleanup logic to the rexi_mon monitor

The core rationale here is that after clauses do not trigger when a process is killed, leaving the possibility of remote zombied RPC workers. In theory the remote nodes' rexi_server processes should get a process down notification? Again, perhaps that's sufficient, I'm personally inclined to do double bookkeeping in these types of scenarios, where we monitor from the RPC and also send out a kill signal from the coordinator side. What do folks think?

Presence in the codebase

Right now I think I've identified this pattern in the four following fabric modules, although I've not done a full audit of the other modules so there may be more instances of this:

@chewbranca chewbranca changed the title Potential pattern of ignorning stranded RPC workers Potential pattern of ignoring stranded RPC workers Jul 11, 2024
@chewbranca
Copy link
Contributor Author

Looks like dreyfus_rpc does the right thing and cleanup the Workers in the outer after clause https://github.com/apache/couchdb/blob/main/src/dreyfus/src/dreyfus_fabric_search.erl#L147 and it looks like that's the full list of workers too, not just the winning shard range workers. I suggest, at a minimum, we follow the same pattern from dreyfus_rpc and do cleanup on the full set of workers in the after clause.

I say "at a minimum" because I think we should consider moving the cleanup to the dedicated rexi_mon process such that if the coordinator process dies it'll still have the workers cleaned up. This is definitely a secondary concern compared to the main source of stranded workers in this ticket, but still worth considering.

@nickva
Copy link
Contributor

nickva commented Jul 16, 2024

Good finds @chewbranca! Clearly there is something broken here and we should fix it. Thanks for the detailed analysis!

we should consider moving the cleanup to the dedicated rexi_mon process

For streams we already have a cleanup process spawned for every streaming request https://github.com/apache/couchdb/blob/main/src/fabric/src/fabric_streams.erl#L47. We should see why that doesn't clean up the workers and lets them timeout instead.

Perhaps it's too cautious to avoid sending unnecessary kill messages? It tries to use the rexi_STREAM_CANCEL which makes the worker exit normal, instead of killing it to avoid generating sasl generate sasl logs. But perhaps that won't happen as those workers are not gen_servers?

Recently we also added a kill_all command to aggregate kill commands per node, so instead of sending one per shard, it's one per node with a list of refs, maybe that's enough to keep the overhead of the extra kills fairly low.

Another thing to keep it mind is that we don't always want to kill the workers, at least in the update docs path we specifically allow them to finish updating to reduce the pressure on the internal replicator.

Looks like dreyfus_rpc does the right thing and cleanup the Workers in the outer after clause

Dreyfus doesn't use the streams facility, so likely has a slightly different way to doing cleanup. There is also the complication of replacements if they are spawned, those have to be cleaned up as well. However if we do a blanket kill_all for all the workers then it should take care of that, too. But, it would nice to see what corner cases we're missing currently. Which errors are generated and if it's triggered by some error or just a race condition...

Do you have a easily reproducible scenario to test it out? Start a 3 node cluster and issue a bunch of _all_docs calls?

@nickva
Copy link
Contributor

nickva commented Jul 26, 2024

Having failed to reproduce this locally so moved on to investigate on a cluster where this error happens regularly.

Found a cluster where exit:timeout stream init timeout errors happen up to 4000 times per minute. Noticed most of them are not generated by an error in the coordinator or the workers. The processes will generate those are calls to fabric:design_docs/1 from the ddoc cache recover logic. The calls seem to not generate any failures except the left-over workers in the stream_init state, waiting or stream start/cancel messages, which was rather baffling at first.

However after a more thorough investigation, the reason for that is that design docs are updated often enough that the ddoc cache is quickly firing up and immediately kill the fabric:design_docs/1 process. There is nothing to log an error and since these are not gen_servers registered with SASL they don't emit any error logs, as expected.

In general, we already have a fabric_streams mechanism to handle the coordinator being killed unexpectedly. However tracing the lifetime of the fabric:design_docs/1 processes, the coordinator is often killed before it gets a chance to even start the auxiliary cleanup process. The current pattern is something like this:

We submit the jobs:

Workers0 = fabric_util:submit_jobs(
Shards, fabric_rpc, all_docs, [Options, WorkerArgs]
),

Then we spawn the cleanup process:

spawn_worker_cleaner(self(), Workers0, ClientReq),
Timeout = fabric_util:request_timeout(),
case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of

Those may seem like they would happen almost immediately, however tracing the init_p call on the workers side, and trying to log the process info of the caller (coordinator), by the time the init_p function is called, the coordinator is already dead. Since we never spawned the cleaner process yet, there is nothing to clean up these workers.

On the positive side, these workers don't actually do any work, they just wait in a receive clause, albeit with an open handle Db handle which is not too great.

To fix this particular case we have to ensure the cleaner process starts even earlier. By the time the coordinator submits the jobs the cleanup process should be up and waiting with the node-ref tuples ready to clean them up.

nickva added a commit that referenced this issue Jul 26, 2024
Previously, if the coordinator process is killed too quickly, before the stream
worker cleanup process is spawned, remote workers may be left around
waiting until the default 5 minute timeout expires.

In order to reliably clean up processes in that state, need to start the
cleaner process, with all the job references, before we start submitting them
for execution.

At first, it may seem impossible to monitor a process until after it's already
spawned. That's true for regular processes, however rexi operates on plain
references. For each process we spawn remotely we create a reference on the
coordinator side, which we can then use to track that job. Those are just plain
manually created references. Nothing stops us from creating them first, adding
them to a cleaner process, and only then submitting them.

That's exactly what this commit accomplishes:

  * Create a streams specific `fabric_streams:submit_jobs/4` function, which
  spawns the cleanup process early, generates worker references, and then
  submits the jobs. This way, all the existing streaming submit_jobs can be
  replaced easily in one line: fabric_util -> fabric_streams.

  * The cleanup process operates as previously: monitors the coordinator for
  exits, and fires off `kill_all` message to each node.

  * Create `rexi:cast_ref(...)` variants of `rexi:cast(...)` calls, where the
  caller specifies the references a new argument. This is what allows us to
  start the cleanup process before the even get submitted. Older calls can just
  be easily call into the `cast_ref` versions with their own created
  references.

Since we added the new `rexi:cast_ref(...)` variants, ensure to add more test
coverage, including the streaming logic as well. It's not 100% yet, but getting
there.

Also, the comments in `rexi.erl` were full of erldoc stanzas and we don't
actually build erldocs anywhere, so replace them with something more helpful.
The streaming protocol itself was never quite described anywhere, and it can
take sometime to figure it out (at least it took me), so took the chance to
also add a very basic, high level description of the message flow.

Related: #5127 (comment)
nickva added a commit that referenced this issue Jul 26, 2024
Previously, if the coordinator process is killed too quickly, before the stream
worker cleanup process is spawned, remote workers may be left around
waiting until the default 5 minute timeout expires.

In order to reliably clean up processes in that state, need to start the
cleaner process, with all the job references, before we start submitting them
for execution.

At first, it may seem impossible to monitor a process until after it's already
spawned. That's true for regular processes, however rexi operates on plain
references. For each process we spawn remotely we create a reference on the
coordinator side, which we can then use to track that job. Those are just plain
manually created references. Nothing stops us from creating them first, adding
them to a cleaner process, and only then submitting them.

That's exactly what this commit accomplishes:

  * Create a streams specific `fabric_streams:submit_jobs/4` function, which
  spawns the cleanup process early, generates worker references, and then
  submits the jobs. This way, all the existing streaming submit_jobs calls can
  be replaced easily in one line: `fabric_util` -> `fabric_streams`.

  * The cleanup process operates as previously: monitors the coordinator for
  exits, and fires off `kill_all` message to each node when needed.

  * Create `rexi:cast_ref(...)` variants of `rexi:cast(...)` calls, where the
  caller specifies the references as arguments. This is what allows us to start
  the cleanup process before the jobs are even submitted. Older calls can just
  be transformed to call into the `cast_ref` versions with their own created
  references.

Noticed that we don't need to keep the whole list of shards in memory in the
cleaner process. For Q=64, N=3 that can add up to a decent blob of binary
paths. We only need node names (atoms) and refs. So updated to use just a set
of [{Node, Ref}, ...]. A set since in theory someone would add the same worker
twice to it.

Since we added the new `rexi:cast_ref(...)` variants, ensure to add more test
coverage, including the streaming logic as well. It's not 100% yet, but getting
there.

Also, the comments in `rexi.erl` were full of erldoc stanzas and we don't
actually build erldocs anywhere, so replace them with something more helpful.
The streaming protocol itself was never quite described anywhere, and it can
take sometime to figure it out (at least it took me), so took the chance to
also add a very basic, high level description of the message flow.

Related: #5127 (comment)
nickva added a commit that referenced this issue Jul 28, 2024
Previously, if the coordinator process is killed too quickly, before the stream
worker cleanup process is spawned, remote workers may be left around
waiting until the default 5 minute timeout expires.

In order to reliably clean up processes in that state, need to start the
cleaner process, with all the job references, before we start submitting them
for execution.

At first, it may seem impossible to monitor a process until after it's already
spawned. That's true for regular processes, however rexi operates on plain
references. For each process we spawn remotely we create a reference on the
coordinator side, which we can then use to track that job. Those are just plain
manually created references. Nothing stops us from creating them first, adding
them to a cleaner process, and only then submitting them.

That's exactly what this commit accomplishes:

  * Create a streams specific `fabric_streams:submit_jobs/4` function, which
  spawns the cleanup process early, generates worker references, and then
  submits the jobs. This way, all the existing streaming submit_jobs calls can
  be replaced easily in one line: `fabric_util` -> `fabric_streams`.

  * The cleanup process operates as previously: monitors the coordinator for
  exits, and fires off `kill_all` message to each node when needed.

  * Create `rexi:cast_ref(...)` variants of `rexi:cast(...)` calls, where the
  caller specifies the references as arguments. This is what allows us to start
  the cleanup process before the jobs are even submitted. Older calls can just
  be transformed to call into the `cast_ref` versions with their own created
  references.

Noticed that we don't need to keep the whole list of shards in memory in the
cleaner process. For Q=64, N=3 that can add up to a decent blob of binary
paths. We only need node names (atoms) and refs. So updated to use just a set
of [{Node, Ref}, ...]. A set since in theory someone would add the same worker
twice to it.

Since we added the new `rexi:cast_ref(...)` variants, ensure to add more test
coverage, including the streaming logic as well. It's not 100% yet, but getting
there.

Also, the comments in `rexi.erl` were full of erldoc stanzas and we don't
actually build erldocs anywhere, so replace them with something more helpful.
The streaming protocol itself was never quite described anywhere, and it can
take sometime to figure it out (at least it took me), so took the chance to
also add a very basic, high level description of the message flow.

Related: #5127 (comment)
nickva added a commit that referenced this issue Jul 28, 2024
Previously, if the coordinator process is killed too quickly, before the stream
worker cleanup process is spawned, remote workers may be left around
waiting until the default 5 minute timeout expires.

In order to reliably clean up processes in that state, need to start the
cleaner process, with all the job references, before we start submitting them
for execution.

At first, it may seem impossible to monitor a process until after it's already
spawned. That's true for regular processes, however rexi operates on plain
references. For each process we spawn remotely we create a reference on the
coordinator side, which we can then use to track that job. Those are just plain
manually created references. Nothing stops us from creating them first, adding
them to a cleaner process, and only then submitting them.

That's exactly what this commit accomplishes:

  * Create a streams specific `fabric_streams:submit_jobs/4` function, which
  spawns the cleanup process early, generates worker references, and then
  submits the jobs. This way, all the existing streaming submit_jobs calls can
  be replaced easily in one line: `fabric_util` -> `fabric_streams`.

  * The cleanup process operates as previously: monitors the coordinator for
  exits, and fires off `kill_all` message to each node when needed.

  * Create `rexi:cast_ref(...)` variants of `rexi:cast(...)` calls, where the
  caller specifies the references as arguments. This is what allows us to start
  the cleanup process before the jobs are even submitted. Older calls can just
  be transformed to call into the `cast_ref` versions with their own created
  references.

Noticed that we don't need to keep the whole list of shards in memory in the
cleaner process. For Q=64, N=3 that can add up to a decent blob of binary
paths. We only need node names (atoms) and refs. So updated to use just a set
of [{Node, Ref}, ...]. A set since in theory someone would add the same worker
twice to it.

Since we added the new `rexi:cast_ref(...)` variants, ensure to add more test
coverage, including the streaming logic as well. It's not 100% yet, but getting
there.

Also, the comments in `rexi.erl` were full of erldoc stanzas and we don't
actually build erldocs anywhere, so replace them with something more helpful.
The streaming protocol itself was never quite described anywhere, and it can
take sometime to figure it out (at least it took me), so took the chance to
also add a very basic, high level description of the message flow.

Related: #5127 (comment)
nickva added a commit that referenced this issue Aug 2, 2024
Previously, we performed cleanup only for specific errors such as
`ddoc_updated`, and `insufficient_storage`. In case of other errors, or
timeouts, there was a chance we would leak workers waiting to be either started
or canceled. Those workers would then wait around until the 5 minute rexi
timeout fires, and then they emit an error in the logs. It's not a big deal as
that happens on errors only, and the processes are all waiting in receive,
however, they do hold a Db handle open, so they can waste resources from that
point of view.

To fix that, this commit extends cleanup to other errors and timeouts.

Moreover, in case of timeouts, we log fabric worker timeout errors. In order to
do that we export the `fabric_streams` internal `#stream_acc` record to every
`fabric_streams` user. That's a bit untidy, so make the timeout error return
the defunct workers only, and so, we can avoid leaking the `#stream_acc` record
outside the fabric_streams module.

Related to #5127
@nickva
Copy link
Contributor

nickva commented Aug 2, 2024

So far in production we noticed most of the cases of exit:timeout errors generated by rexi:init_stream came from quick killing of design doc fetches from ddoc cache. That should be fixed by #5152. However, the analysis above is also correct that we do not clean up workers on error or timeouts. Except for a few expected error types only:

handle_stream_start({ok, Error}, _, St) when Error == ddoc_updated; Error == insufficient_storage ->
WaitingWorkers = [W || {W, _} <- St#stream_acc.workers],
ReadyWorkers = [W || {W, _} <- St#stream_acc.ready],
cleanup(WaitingWorkers ++ ReadyWorkers),

In this PR we improve cleanup and perform cleanup for all stream start errors, including timeouts.

nickva added a commit that referenced this issue Aug 2, 2024
Previously, we performed cleanup only for specific errors such as
`ddoc_updated`, and `insufficient_storage`. In case of other errors, or
timeouts, there was a chance we would leak workers waiting to be either started
or canceled. Those workers would then wait around until the 5 minute rexi
timeout fires, and then they emit an error in the logs. It's not a big deal as
that happens on errors only, and the processes are all waiting in receive,
however, they do hold a Db handle open, so they can waste resources from that
point of view.

To fix that, this commit extends cleanup to other errors and timeouts.

Moreover, in case of timeouts, we log fabric worker timeout errors. In order to
do that we export the `fabric_streams` internal `#stream_acc` record to every
`fabric_streams` user. That's a bit untidy, so make the timeout error return
the defunct workers only, and so, we can avoid leaking the `#stream_acc` record
outside the fabric_streams module.

Related to #5127
nickva added a commit that referenced this issue Aug 2, 2024
Previously, we performed cleanup only for specific errors such as
`ddoc_updated`, and `insufficient_storage`. In case of other errors, or
timeouts, there was a chance we would leak workers waiting to be either started
or canceled. Those workers would then wait around until the 5 minute rexi
timeout fires, and then they emit an error in the logs. It's not a big deal as
that happens on errors only, and the processes are all waiting in receive,
however, they do hold a Db handle open, so they can waste resources from that
point of view.

To fix that, this commit extends cleanup to other errors and timeouts.

Moreover, in case of timeouts, we log fabric worker timeout errors. In order to
do that we export the `fabric_streams` internal `#stream_acc` record to every
`fabric_streams` user. That's a bit untidy, so make the timeout error return
the defunct workers only, and so, we can avoid leaking the `#stream_acc` record
outside the fabric_streams module.

Related to #5127
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants