while (offset < totalCount) { // Construct N1QL query to read a page of data from the source bucket
// Load the current page from Couchbase into a DataFrame
Dataset<Row> currentBatch = spark.read()
.format("couchbase.query")
.option("bucket", "Bucket_1")
.load().offset(offset).limit(pageSize);
Dataset<Row> dataToWrite = currentBatch.withColumn("__META_ID", functions.col("_id"));
if (currentBatch.isEmpty()) {
// No more data to process, exit the loop
spark.stop();
cluster.disconnect();
break;
}
// // Transform the DataFrame to match the target bucket's schema, if necessary
// Dataset<Row> transformedBatch = currentBatch;
// Write the batch to the target bucket
System.out.println(offset);
dataToWrite.write().format("couchbase.kv")
.option("bucket", "Bucket_2")
.option("scope", "Data")
.option("collection", "test")
.option("idField", "__META_ID") // Specify the field to use as the document ID
.mode("errorifexists") // or "overwrite" depending on your needs
.save();
System.out.println(offset);
// Increment the offset for the next page
offset += pageSize;
}
it always lead to duplicate keys constraint in couchbase , i am very sure that _id hasn’t no duplicate key , so why this happen ?
i am very sure that _id hasn’t no duplicate key , so why this happen ?
While all the _id in currentBatch might be unique, there may already be a document in Bucket2.Data.test with the id of one of the values of _id. For instance, if you execute that code twice the second execution is guaranteed to have duplicates.
how the key can be duplicated in the second execution and i am doing an offset and limit in the query , i think this guarantees that the data from the first batch has no common items with the second batch
It looks like there is an expectation that the _id property in documents from Bucket_1 is a unique id. The following query should return an empty result if they are indeed unique.
select _id, count(_id) from Bucket_1 group by _id having count(_id) > 1
There is also an expectation that the _id from Bucket_1 is unique with respect to documents in Bucket_2.Data.test. The following query should return an empty result if they are indeed unique.
select b1._id from Bucket_1 b1, Bucket_2.Data.test b2 where b1._id = meta(b2).id
This will not guarantee. Assume you have 11 rows and got 10 rows . Now insert another row that goes before 10 rows. Run offset 10, now previous 10th one returned.
@abdallahmselim looking at the Spark connector code that looks like the wrong constant name:
/** Option Key: The field name of the document ID, used to override the default.
*
* The default can be located in [[DefaultConstants.DefaultIdFieldName]]
*/
val IdFieldName = "idFieldName"