Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add failed offset to Flink error messages #336

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

gfmio
Copy link

@gfmio gfmio commented Aug 31, 2023

This PR adds the topic, partition and offset to the error message when Flink encounters a null key to simplify finding the offending message.

@gfmio gfmio marked this pull request as ready for review August 31, 2023 20:20
@@ -45,7 +45,7 @@ public RoutableKafkaIngressDeserializer(Map<String, RoutingConfig> routingConfig
public Message deserialize(ConsumerRecord<byte[], byte[]> input) {
final String topic = input.topic();
final byte[] payload = input.value();
final byte[] key = requireNonNullKey(input.key());
Copy link

@kazimirov999 kazimirov999 Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, I'd like your thoughts on the idea of incorporating the ability to skip records with null keys. Additionally, I'm thinking of making this functionality configurable. What are your thoughts on this?

kind: io.statefun.kafka.v1/ingress
spec:
  id: localhost:9092
  consumerGroupId: test-group-dev
  startupPosition:
    type: earliest
  properties:
    - isolation.level: read_committed
    - security.protocol: SSL
    - ssl.truststore.location: /tmp/flink/jks/truststore.jks
    - ssl.truststore.password: changeit
    - ssl.keystore.location: /tmp/flink/jks/kafka_keystore.jks
    - ssl.keystore.password: test
    - ssl.key.password: test
  topics:
    - topic: test-dev
      skipNullKeyRecords: true
      valueType: ua.Test
      targets:
        - ua.Test

default can be skipNullKeyRecords: false

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That might work. The NoOpDeserliazer just returns null, which could be done here as well.

private static class NoOpDeserializer implements KafkaIngressDeserializer<String> {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants