Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug] Not all messages are consumed when dedupe is enabled #955

Open
zagadheesh opened this issue May 27, 2024 · 8 comments · May be fixed by #972
Open

[bug] Not all messages are consumed when dedupe is enabled #955

zagadheesh opened this issue May 27, 2024 · 8 comments · May be fixed by #972
Labels

Comments

@zagadheesh
Copy link

zagadheesh commented May 27, 2024

System Information

  • Aedes: 0.51.1
  • NodeJS: 16.15.0
  • OS: Mac OS 14.5
  • Arch: x86_64
  • Java: 1.8

Describe the bug
I'm currently experiencing an issue while running Aedes alongside an MQTT Java client using example.js (attached) with no modifications to the aedes source code. Despite running both the consumer and producer, it seems that certain messages aren't being consumed. Interestingly, after commenting out the 'dedupe' call in the client.js file of the Aedes source code, the consumer is able to successfully consume all messages.

To Reproduce
Steps to reproduce the behavior:

  1. Run the broker using example.js on aedes 0.51.1 source code
  2. Run AedesSampleConsumer.java
  3. Run AedesSampleProducer.java

Expected behavior
Consumer should consume all the messages produced.

aedes-problem-demo.zip

@zagadheesh zagadheesh added the bug label May 27, 2024
@robertsLando
Copy link
Member

@zagadheesh I have actually no time to dig into this, could you do try to draft a PR to fix the issue? I already fixed dedupe a while ago for the same reason and the problem was the persistence used

@zagadheesh
Copy link
Author

@robertsLando The proposed pull request would involve commenting out the 'dedupe' call in client.js. After conducting tests on release 0.42.6 without altering any source code, I observed that the consumer successfully consumed all messages. However, beyond this change, I haven't delved into the entirety of the source code. Any suggestions would be greatly appreciated!

@robertsLando
Copy link
Member

robertsLando commented May 27, 2024

Commenting the dedupe fn is not the solution, it is there for a reason and it should stay there, we need to find out why the messages are not deduped correctly.

Would be better if you can reproduce the same with a consumer'producer in nodejs instead of java

@zagadheesh
Copy link
Author

Reproduced the same in node.js. Please find attched zip file which contains example.js which starts the broker with required configuration along with producer.js and consumer.js files.

sample.zip

@mvandevalk
Copy link

The issue with this bug seems to revolve around the order of the returned value of the brokerCounter. It appears reproducible when using MongoDB as the persistence type. My assumption is that we do not want to ignore out-of-order counts but rather counts that have already been processed.

For example:

  • First packet: brokerId = 1, brokerCounter = 10 -> return true
  • Second packet: brokerId = 1, brokerCounter = 9 -> return true
  • Third packet: brokerId = 1, brokerCounter = 11 -> return true
  • Fourth packet: brokerId = 1, brokerCounter = 10 -> return false

If this assumption is correct, then instead of keeping track of the largest count value, maintaining a recent set of values that have been processed could improve this function.

const MAX_SIZE = 1000;

function dedupe (client, packet) {
  const id = packet.brokerId
  if (!id) {
    return true
  }
  
  const counter = packet.brokerCounter
  const duplicates = client.duplicates

  if (!duplicates[id]) {
    duplicates[id] = { seen: new Set(), order: [] };
    // set(seen) for a quick check if a counter has been processed
    // array(order) keeps track of the entries
  }

  const entry = duplicates[id];
  if (entry.seen.has(counter)) {
    return false;
  }

  entry.seen.add(counter); 
  entry.order.push(counter);

  if (entry.order.length > MAX_SIZE) {
    const oldest = entry.order.shift();
    entry.seen.delete(oldest);
  }
  return true;
}

This approach ensures that we keep track of recently processed values, which can help in managing out-of-order counts more effectively.

@robertsLando
Copy link
Member

could you submit a PR ?

@mvandevalk
Copy link

Please review: #972

@robertsLando
Copy link
Member

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants