NullPointerException If one of the couchbase node goes down

Hello,
I need your assistance with one of the couchbase connection issue.
We have 8 node couchbase cluster but if one of the node goes down then there is a java.lang.NullPointerException thrown when we try to get the collection. The expectation is that it should connect to the other node in case if one of the node goes down

Error Logs

java.lang.NullPointerException
at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.xxx.xxxxxxxx.xxxxxxxx.getCollection(MarketCouchbaseDao.java:157)
at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.xxx.xxxxxxxx.xxxxxxxx.lambda$findByIdList$3(MarketCouchbaseDao.java:93)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:272)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:230)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
at reactor.core.publisher.Mono.subscribe(Mono.java:4400)
at reactor.core.publisher.Mono.block(Mono.java:1706)
at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.xxx.xxxxxxxx.xxxxxxxx.findByIdList(CampaignCouchbaseDao.java:110)
at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.xxx.xxxxxxxx.xxxxxxxx.$FastClassBySpringCGLIB$$6ae6890f.invoke()
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:736)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.aop.framework.adapter.MethodBeforeAdviceInterceptor.invoke(MethodBeforeAdviceInterceptor.java:52)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.aspectj.AspectJAfterThrowingAdvice.invoke(AspectJAfterThrowingAdvice.java:62)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:671)
at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.xxx.xxxxxxxx.xxxxxxxx.$EnhancerBySpringCGLIB$$fe0057fc.findByIdList()
at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.xxxxx.xxxxxx.xxxxxxx.lambda$getMarketEntityMap$3(Logic.java:103)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.logic.component.Logic.getCampaignEntityMap(Logic.java:102)
at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.functions.processor.AdSalesUpdateProcessor.getCampaignEntity(AdSalesUpdateProcessor.java:85)
at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.functions.processor.AdSalesUpdateProcessor.processValidatedItem(AdSalesUpdateProcessor.java:62)
at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.functions.processor.AdSalesUpdateProcessor.processValidatedItem(AdSalesUpdateProcessor.java:34)
at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.template.processor.InputBeanValidatingItemProcessor.process(InputBeanValidatingItemProcessor.java:57)
at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.template.processor.InputBeanValidatingItemProcessor$$FastClassBySpringCGLIB$$63fc2e15.invoke()
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:736)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:671)
at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.functions.processor.AdSalesUpdateProcessor$$EnhancerBySpringCGLIB$$6d99aab5.process()
at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:126)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:293)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:192)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:272)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:81)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:374)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:257)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:200)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:392)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:306)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
at com.sun.proxy.$Proxy116.run(Unknown Source)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:211)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:227)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:123)
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:117)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:732)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:716)
at org.springframework.boot.SpringApplication.afterRefresh(SpringApplication.java:703)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:304)
at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.template.BatchApplicationBase.run(BatchApplicationBase.java:37)
at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.functions.BatchApplication.main(BatchApplication.java:32)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Mono.block(Mono.java:1707)
… 73 more

java.lang.NullPointerException at xx.xx.xxxxxxx.xxxxxxx.xxx.xxxxx.xxx.xxxxxxxx.xxxx xxx.getCollection(MarketCouchbaseDao.java:157)

That’s not Couchbase code, I assume it’s your code so you’ll need to figure out what the issue is. A node becoming unavailable could cause accessing that node by the kv API to timeout. If it’s by the query API, the SDK will simply use a different query node

@mreiche The NullPointerException is happening here. I believe it is while trying to get the bucket. My understanding is that in case if the node is down then the auto failover will happen and the healthy node will be connected. But that is not happening. I don’t think the issue is with the application but it is while trying to connect to the couchbase.

    protected Collection getCollection() {
        return this.bucket.scope(scopeName).collection(collectionName);
    }
    @Bean
    @Qualifier(value = "marketBucket")
    public Bucket marketBucket(){
        try {
            Cluster couchbaseCluster = couchbaseCluster(getEnvironment());
            Bucket bucket = this.getBucket(couchbaseCluster, marketBucket);
            bucket.waitUntilReady(Duration.ofSeconds(60));
            return bucket;
        } catch (InvalidKeyException | NoSuchAlgorithmException | NoSuchPaddingException
                 | IllegalBlockSizeException | BadPaddingException e) {
            throw new BeanInitializationException("Failed to init data source for market Bucket.", e);
        } catch (Exception ex) {
            LOG.warn("Unable to get connection to couchbase Market buckets", ex);
        }
        return null;
    }
    private Bucket getBucket(final Cluster couchbaseCluster, final String bucketName) {
        return couchbaseCluster.bucket(bucketName);
    }
    public Cluster couchbaseCluster(final ClusterEnvironment clusterEnvironment)
            throws InvalidKeyException, NoSuchAlgorithmException, NoSuchPaddingException, IllegalBlockSizeException, BadPaddingException {

        return Cluster.connect(hosts, ClusterOptions
                .clusterOptions(clusterUserName.trim(), cipherHelper.decrypt(clusterPassword.trim()))
                .environment(clusterEnvironment));
    }

It looks like this.bucket is null. Do you ever assign anything to it?

I see that it marketBucket() you assign a value to

Bucket bucket = this.getBucket(couchbaseCluster, marketBucket)

But that is not the class field this.bucket.

maybe there was an exception - but you only log it as a warning. There may be some exception completely unrelated to the node being down (like a wrong username/password).

} catch (Exception ex) {
    LOG.warn("Unable to get connection to couchbase Market buckets", ex);
}

@mreiche Yes, the bucket is getting assigned and the code is working almost all the time. However, when one of the node is down (8 Node cluster) then the application fails.

Yes, it is possible that there is some error which is not logged.

catch (Exception ex) {
    LOG.warn("Unable to get connection to couchbase Market buckets", ex);
}

But shouldn’t it connect to the healthy node instead of the the application getting failed?

Yes, it is possible that there is some error which is not logged.
But shouldn’t it connect to the healthy node instead of the the application getting failed?

That error will have information in it that will help. I suspect it will say that the “waitUntilReady” timed out because the cluster is not fully ready, because by default, it waits for ONLINE. If you pass in options WaitUntilReadyOptions.waitUntilReadyOptions().desiredState(ClusterState.DEGRADED) then it might let you continue.

But shouldn’t it connect to the healthy node

It’s not that simple. The ‘cluster’ actually connects to all the nodes. The active document resides only on one node (to avoid having multiple versions of the same document). If that node is down, the active document is not available. You can still access replicas of the document on other nodes if you use the getAnyReplica() operation. You can still access active documents that are on other nodes. And once a failover/rebalance completes, it will be back to normal, with all the documents being accessible. Just without the node that is down.

@mreiche I think i understand the issue.

  1. The couchbase connect tries to connect to all the available hosts in the config file.
    Code : Cluster couchbaseCluster = couchbaseCluster(getEnvironment());

  2. It is able to connect to all the hosts except for the host which is down.

  3. It tries to connect to the node which is down for 1 min multiple times.
    Code: bucket.waitUntilReady(Duration.ofSeconds(60));

  4. After 1 minute, it throws a warning log
    Code: LOG.warn(“Unable to get connection to couchbase Market buckets”, ex);
    and prints the exception: com.couchbase.client.core.error.UnambiguousTimeoutException

  5. But since the above is just warning log therefore the code continues and return null
    Code : return null;

@mreiche Is there any way to handle this use case where i want the application to ignore the failed node and continue?

Yes. Follow the instructions I gave above.

If you pass in options WaitUntilReadyOptions.waitUntilReadyOptions().desiredState(ClusterState.DEGRADED) then it might let you continue.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.