Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Deduplication]primary key working differently when running in same session vs running a new session #11241

Open
1 of 3 tasks
anambiar-epsilon opened this issue Oct 1, 2024 · 1 comment
Labels
bug Something isn't working

Comments

@anambiar-epsilon
Copy link

Apache Iceberg version

1.6.0

Query engine

Flink

Please describe the bug 🐞

i have this code
` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH); // Set to BATCH mode for SQL

    EnvironmentSettings settings = EnvironmentSettings.newInstance()
            .inBatchMode()
            .build();

    TableEnvironment tableEnv = TableEnvironment.create(settings);

    // Step 2: Define the Glue catalog
    tableEnv.executeSql("""
            CREATE CATALOG glue_catalog WITH (
            'type'='iceberg',
            'warehouse'='s3://flink-poc-4/flink_table_iceberg_12',
            'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
            'io-impl'='org.apache.iceberg.aws.s3.S3FileIO');
            """);

    // Step 3: Use the Glue catalog
    tableEnv.executeSql("USE CATALOG glue_catalog");

// // Step 4: Create the Iceberg table with upsert enabled and a partition column
String createTableSQL = """
CREATE TABLE test_akshay.flink_table_iceberg_12 (
id BIGINT,
data STRING NOT NULL,
year1 INT NOT NULL,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'format-version' = '2',
'write.upsert.enabled' = 'true',
'write.merge.mode' = 'copy-on-write',
'write.delete.mode' = 'copy-on-write',
'write.update.mode' = 'copy-on-write',
'partition-spec' = 'year1'
)
""";

    tableEnv.executeSql(createTableSQL);

// // Step 5: Insert initial mock data with the year value
String insertDataSQL = """
INSERT INTO test_akshay.flink_table_iceberg_12 VALUES
(1, 'data1', 2023),
(2, 'data2', 2023),
(3, 'data3', 2024)
""";

    tableEnv.executeSql(insertDataSQL);

    // Step 6: Upsert (update existing and insert new records) with the year value
    String upsertDataSQL = """
            INSERT INTO test_akshay.flink_table_iceberg_12 VALUES
            (1, 'wowow', 2023),
            (4, 'data5', 2024)
            """;

    tableEnv.executeSql(upsertDataSQL);

    // Step 7: Query the table to validate the upsert results
    String querySQL = "SELECT * FROM test_akshay.flink_table_iceberg_12";
    tableEnv.executeSql(querySQL).print();`
    
    when running this code i get only 4 ids  (expected) but the value of `id 1` is `data1`, and then i close this session and run again a simple insert into `id 1` and  `data8` the value of id 1 gets upserted to data8, is this expected, whats the expected behaviour here ?

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@anambiar-epsilon anambiar-epsilon added the bug Something isn't working label Oct 1, 2024
@anambiar-epsilon
Copy link
Author

one more question, if this is the expected behaviour it says in the doc that we will need dedupe to be implemented by the application layer but it seems like iceberg does dedupe with primary key (with no enforce) and enabled upsert, can we expect the same behaviour for a 5000 rps(high) load streaming as well ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant