I try to use SSL between DCP client and couchbase server, with a minimum test client code. Got NPE in connection initialization. It works fine when SSL parameters are commented out.
I know that dcp client is still work in progress. Has anyone managed to use SSL or is it known not to work?
Environment:
Client:
DCP Client version 0.12.0
Core IO version 1.5.3
Java SDK version 2.5.3
Server 5.0.0
Stack trace of NPE is
Exception in thread “main” java.lang.NullPointerException
at com.couchbase.client.dcp.conductor.Conductor.masterChannelByPartition(Conductor.java:289)
at com.couchbase.client.dcp.conductor.Conductor.access$6(Conductor.java:283)
at com.couchbase.client.dcp.conductor.Conductor$9.call(Conductor.java:207)
at com.couchbase.client.dcp.conductor.Conductor$9.call(Conductor.java:1)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
at rx.internal.util.ScalarSynchronousObservable$WeakSingleProducer.request(ScalarSynchronousObservable.java:276)
at rx.Subscriber.setProducer(Subscriber.java:211)
in my debug I see that client received config from server
{“rev”:4163,“name”:“myRater”,“uri”:"/pools/default/buckets/myRater?bucket_uuid=c6041a71b30ddc09e58ae5eb69ebd789",“streamingUri”:"/pools/default/bucketsStreaming/myRater?bucket_uuid=c6041a71b30ddc09e58ae5eb69ebd789",“nodes”:[{“couchApiBase”:“http://:8092$HOST/myRater%2Bc6041a71b30ddc09e58ae5eb69ebd789",“hostname”:"$HOST:8091",“ports”:{“proxy”:11211,“direct”:11210}}],“nodesExt”:[{“services”:{“mgmt”:8091,“mgmtSSL”:18091,“indexAdmin”:9100,“indexScan”:9101,“indexHttp”:9102,“indexStreamInit”:9103,“indexStreamCatchup”:9104,“indexStreamMaint”:9105,“indexHttps”:19102,“capiSSL”:18092,“capi”:8092,“kvSSL”:11207,“projector”:9999,“kv”:11210,“moxi”:11211,“n1ql”:8093,“n1qlSSL”:18093},“thisNode”:true}],“nodeLocator”:“vbucket”,“uuid”:“c6041a71b30ddc09e58ae5eb69ebd789”,“ddocs”:{“uri”:"/pools/default/buckets/myRater/ddocs"},“vBucketServerMap”:{“hashAlgorithm”:“CRC”,“numReplicas”:0,“serverList”:["$HOST:11210"],“vBucketMap”:[[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0],[0]]},“bucketCapabilitiesVer”:"",“bucketCapabilities”:[“xattr”,“dcp”,“cbhello”,“touch”,“couchapi”,“cccp”,“xdcrCheckpointing”,"nodesExt”]}
Then a copy of NodeInfo without sslService is first created, then another instance with sslService is created in AbstractBucketConfig constructor. However when Conductor code throws NPE
NodeInfo node = config.nodeAtIndex(index);
for (DcpChannel ch : channels) {
InetSocketAddress address = new InetSocketAddress(node.hostname().nameOrAddress(),
(env.sslEnabled() ? node.sslServices() : node.services()).get(ServiceType.BINARY));
config.nodeAtIndex(index) call returns the NodeInfo instance without sslService, causing a null in map get.
The code of managing configuratin is in core-io, not in dcp client.
So is there a bug in java client core code or how dcp client use it?
My test client is
public class DCPClient {
public static void main(String[] args) throws InterruptedException {
// Connect to localhost and use the travel-sample bucket
final Client client = Client.configure()
.hostnames("localhost")
.bucket("myRater")
.sslEnabled(true)
.sslKeystoreFile("/Users/shenjin/Downloads/rater-example/keystore")
.sslKeystorePassword("password")
.build();
// Don't do anything with control events in this example
client.controlEventHandler(new ControlEventHandler() {
@Override
public void onEvent(ChannelFlowController flowController, ByteBuf event) {
event.release();
}
});
// Print out Mutations and Deletions
client.dataEventHandler(new DataEventHandler() {
@Override
public void onEvent(ChannelFlowController flowController, ByteBuf event) {
if (DcpMutationMessage.is(event)) {
System.out.println("Mutation: " + DcpMutationMessage.toString(event));
// You can print the content via DcpMutationMessage.content(event).toString(CharsetUtil.UTF_8);
} else if (DcpDeletionMessage.is(event)) {
System.out.println("Deletion: " + DcpDeletionMessage.toString(event));
}
event.release();
}
});
// Connect the sockets
client.connect().await();
// Initialize the state (start now, never stop)
client.initializeState(StreamFrom.NOW, StreamTo.INFINITY).await();
// Start streaming on all partitions
client.startStreaming().await();
// Sleep for some time to print the mutations
// The printing happens on the IO threads!
Thread.sleep(TimeUnit.MINUTES.toMillis(10));
// Once the time is over, shutdown.
client.disconnect().await();
}
}