Write().format("couchbase.kv") always lead to duplicate key

given this code

   while (offset < totalCount) { // Construct N1QL query to read a page of data from the source bucket


                // Load the current page from Couchbase into a DataFrame
                Dataset<Row> currentBatch = spark.read()
                        .format("couchbase.query")
                        .option("bucket", "Bucket_1")
                        .load().offset(offset).limit(pageSize);
                Dataset<Row> dataToWrite = currentBatch.withColumn("__META_ID", functions.col("_id"));
                if (currentBatch.isEmpty()) {
                    // No more data to process, exit the loop
                    spark.stop();
                    cluster.disconnect();

                    break;
                }
                // // Transform the DataFrame to match the target bucket's schema, if necessary
                // Dataset<Row> transformedBatch = currentBatch;

                // Write the batch to the target bucket
                System.out.println(offset);
                dataToWrite.write().format("couchbase.kv")
                .option("bucket", "Bucket_2")
                .option("scope", "Data")
                .option("collection", "test")
                .option("idField", "__META_ID") // Specify the field to use as the document ID
                .mode("errorifexists") // or "overwrite" depending on your needs
                .save();
                System.out.println(offset);

                // Increment the offset for the next page
                offset += pageSize;
            }

it always lead to duplicate keys constraint in couchbase , i am very sure that _id hasn’t no duplicate key , so why this happen ?

What is __META_ID of every Row in dataToWrite?

Does functions.col(“_id”) give a unique value?

currentBatch.withColumn("__META_ID", functions.col("_id"))

it copies the _id column into __META_ID so now the auto generated key from couchbase will be equal to _id column

i am very sure that _id hasn’t no duplicate key , so why this happen ?

While all the _id in currentBatch might be unique, there may already be a document in Bucket2.Data.test with the id of one of the values of _id. For instance, if you execute that code twice the second execution is guaranteed to have duplicates.

how the key can be duplicated in the second execution and i am doing an offset and limit in the query , i think this guarantees that the data from the first batch has no common items with the second batch

We need to figure out how they can be duplicate.

  1. It looks like there is an expectation that the _id property in documents from Bucket_1 is a unique id. The following query should return an empty result if they are indeed unique.

select _id, count(_id) from Bucket_1 group by _id having count(_id) > 1

  1. There is also an expectation that the _id from Bucket_1 is unique with respect to documents in Bucket_2.Data.test. The following query should return an empty result if they are indeed unique.

select b1._id from Bucket_1 b1, Bucket_2.Data.test b2 where b1._id = meta(b2).id

This will not guarantee. Assume you have 11 rows and got 10 rows . Now insert another row that goes before 10 rows. Run offset 10, now previous 10th one returned.

See if you can use keyset pagination on document key and give next execution last document key or your unique id vs offset OFFSET and Keyset Pagination in N1QL Query | Couchbase

1 Like

@abdallahmselim looking at the Spark connector code that looks like the wrong constant name:

  /** Option Key: The field name of the document ID, used to override the default.
    *
    * The default can be located in [[DefaultConstants.DefaultIdFieldName]]
    */
  val IdFieldName = "idFieldName"

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.