I`m writing docs into couchbase,i received a exception when failover happend.
“com.couchbase.client.java.error.DurabilityException: Durability requirement failed: The CAS on the active node changed for ID “Item411”, indicating it has been modified in the meantime.”.
I use “insert(doc,ReplicateTo.ONE)”, and i find data was lossed.
did you create the doc immediately before (with a JsonDocument.create
)? Did you put a CAS value for it? No other process is performing insertions/upserts on the same key?
This error happens when by the time the document can be seen on the replica (after a polling), the document’s CAS has changed, indicating that the replicated document was from another mutation.
No,i haven`t create the doc immediately,does ReplicateTo.ONE is a constraint
option that must send an replica to other node?
yes ReplicateTo.ONE
is about that: the SDK will wait until the mutation can be seen on at least ONE replica. Of course, replication must be activated for the bucket on the server side (but if it wasn’t, the SDK would give a different exception (ReplicaNotConfiguredException
).
ReplicateTo.xxx is like an observer that watch whether the data been replicate completely,is that right?
I in order to avoid data loss when hard failover occurs,i use insert with ReplicateTo.xxx,but the exception “com.couchbase.client.java.error.DurabilityException: Durability requirement failed: The CAS on the active node changed for…” rised,i don’t understand why the exception happend,when this exception happened my doc wasn`t inserted to couchbase.
It is like an observer, yes. The actual act of replicating is configured by the replication factor of the bucket, on the server side. The server will always honor the replication factor independently of your code calling ReplicateTo.XXX
. This is just instructing the SDK to wait for confirmation of replication before returning execution to the user.
Can you share a bit of code?
- did you customize environment settings when creating the cluster?
- how do you create the document?
- how do you call insert? (what parameters + give some context, eg. if it’s in a loop, if it’s right after creating the
JsonDocument
instance or in another method, etc…)
Here is my code
Observable.from(docs)
.flatMap(new Func1<Document<T>, Observable<Document<T>>>(){
@Override
public Observable<Document<T>> call(final Document<T> t1) {
return retryProcess1(bucket.async().insert(t1, ReplicateTo.ONE));
}
})
.toBlocking()
.forEach(new Action1<Document>(){
@Override
public void call(Document arg0) {
// TODO Auto-generated method stub
retList.add(Boolean.TRUE);
}});
private <T> Observable<Document<T>> retryProcess1(Observable<Document<T>> observer)
{
return observer.retryWhen(
RetryBuilder.anyOf(new Class[] { RequestCancelledException.class}).delay(Delay.fixed(31L, TimeUnit.SECONDS))
.max(100)
.build())
.retryWhen(RetryBuilder.anyOf(new Class[] { TemporaryFailureException.class, BackpressureException.class })
.delay(Delay.fixed(100L, TimeUnit.MILLISECONDS))
.max(200)
.build())
.retryWhen(RetryBuilder.anyOf(new Class[] { DocumentConcurrentlyModifiedException.class })
.delay(Delay.fixed(10L, TimeUnit.MILLISECONDS))
.max(200)
.build());
}
And this is with a cluster in a normal state? No rebalancing/failover/down node?
Could you try to do a minimal but complete Main class that would allow us to just copy and execute to reproduce the problem?
On a side note you don’t need the new Class[] { ... }
for the retry builder, just put the classes separated by commas like this: RetryBuilder.anyOf(TemporaryFailureException.class, BackpressureException.class)...
void setData(Bucket bucket){
for(int i=1;i<=20000;i++){
JsonDocument doc= JsonDocument.create(i,
JsonObject.empty()
.put("id",i)
.put("DocType", "DocType"));
for(;;){
try{
JsonDocument succdoc=bucket.insert(doc,ReplicateTo.ONE);
if(succdoc.id()!=null){
break;
}
continue;
}catch(RequestCancelledException ex) {
ex.printStackTrace();
continue;
}catch(DurabilityException ex){
ex.printStackTrace();
if( ex.getCause() != null && ex.getCause() instanceof DocumentConcurrentlyModifiedException ){
continue;
}
}catch(CompositeException ex ){
ex.printStackTrace();
continue;
}catch(TemporaryFailureException ex ){
ex.printStackTrace();
continue;
}catch(RuntimeException ex ){
ex.printStackTrace();
if(bucket.get("VoccCode"+i)!=null){
break;
}
continue;
}
}
}
}
This my method,when i loading data i press the hard-failover button,then present this exception.Does the replicas in sequence?
I had made some test about loading data when failover happend,
this is my test result, does this is right?
ok so in this context I think it makes sense. You use 2 replicas, but wait for just 1 replica to acknowledge the data (ReplicateTo.ONE
). When you failover the node, one of the 2 replicas becomes active. That means that as long as you don’t rebalance, you have 1 replica less for that subset of data (eg. old master is down, old replica 1 is now master, old replica 2 is not replica 1).
When you’re inserting during the failover, you could run into 3 scenarios:
- replication has time to reach only replica1, but replica2 is promoted: replica2 never saw the document and so you get a durability exception
- replication has time to reach only replica1, and it is the same replica that is promoted: this case replica2 will be detected as “behind” and catch up. => success
- replication has time to reach both replicas before the failover happens. => success
Generally in order to be absolutely sure the best is to use ReplicateTo.TWO
(in your case), that is wait for all replicas to ack.
You should indeed retry if you receive a DurabilityException
Thank you for your detailed answers,I think I understand. In scenarios 1, if replica2 never saw the document then this document will be abandoned?I have a detailed question that does replicas has order number?Does One document will be send to No.1 replica first then send to No.2 next …?
If active vbuckets down does No.1 replica will be promoted?
No the replication is asynchronous and not necessarily sequential. Same for promotion of the replica during failover. I used names “replica1” and “replica2” just for referencing them but you don’t have the notion of an order of replicas in the couchbase admin UI.
thanks , i had read the sdk source codes, i met some code that difficult to understand ,as follows:
ObserveViaMutationToken.call(…){
…scan(ObserveItem.empty(), new Func2<ObserveItem, ObserveItem, ObserveItem>() {
@Override
public ObserveItem call(ObserveItem currentStatus, ObserveItem newStatus) {
return currentStatus.add(newStatus);
}
})
//repetitions will occur unless errors are raised
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
return observable.zipWith(
Observable.range(1, Integer.MAX_VALUE),
new Func2<Void, Integer, Integer>() {
@Override
public Integer call(Void aVoid, Integer attempt) {
return attempt;
}
}
).flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer attempt) {
return Observable.timer(delay.calculate(attempt), delay.unit());
}
});
}
})…
}
i don’t know what does repeatWhen(…) going to do,if scan(…) haven’t got all response’s ObserveItem does sendObserveRequests(…) will always retry?
(OFF TOPIC: please use code block formatting)
There is the block syntax with the three backticks. you can even specify the language:
```java
// someJavaCodeHere
Alternatively you can link directly to a source line on github and a preview will get included:
```url
https://github.com/couchbase/couchbase-jvm-core/blob/master/src/main/java/com/couchbase/client/core/message/observe/ObserveViaCAS.java#L90
thank you very much,i wanna learn couchbase’s inside datastructs could i find couchbase source code?
To answer your previous question, the idea of this piece of code is to check how many replicas have observed the mutation. ObserveItem aggregates the state of how many observe are successful. Then the repeatWhen will continue the polling if not enough observes are successful compared to the ReplicateTo
and PersistTo
constraints. Overall, this will be limited by the observeTimeout
.
For couchbase server, open-source parts are on github in several repositories.
I don’t understand what is the mutationtoken in the response,and when it going
to be null? I see It is used to ObserveViaCAS(),ObserveViaMutationToken().
This is internal to the SDK.
If the server is capable of it (since 4.0), additionally to a CAS on a mutation it will return a more complex object (with a sequence number, vbucket, etc…) to the clients that ask for it.
This is internally represented as a MutationToken
and can be used by the SDK to perform safer Observe
, better honoring ReplicateTo
and PersistTo
constraints when multiple parallel mutations are done on a same document.
Whether every response has the MutationToken ? I see the “DocumentConcurrentlyModifiedException” only throw when response doesn’t have MutationToken
I read the client code,i assume a scenarios that:
bucket has 2 replicas,i insert use reolicateTo.ONE,
1 insert request get response successfully
2 observe request get response from master and replica1 successfully ,then client insert completely
But then,failover happens,the doc hasn’t reach to replica2 , the replica2 was promoted.As the new master it never seen the doc,the old replica1 doing roll back.the doc lost.
Does this assumption exsit?