Java DCP Client missing updates

Hi!

I am experiencing some update problems with the new ‘Java DCP Client’ (https://github.com/couchbaselabs/java-dcp-client). It appears as if though updates to documents in the server eventually stop being pushed to the client. Upon inspecting the logs I see nothing indicating that anything has gone wrong. Am I using the client correctly?

Version info
com.couchbase.client.java-client version: '2.3.4’
com.couchbase.client,dcp-client version: '0.4.0’
Server Version: 3.1.3-1823 Enterprise Edition (build-1823)

Please see my code below.

@Slf4j
public class DcpClient {
    private final Client client;

    private ScheduledExecutorService executorService;
    private ScheduledFuture<?> taskHandle;

    private BiConsumer<String, JsonObject> onNext;
    private Consumer<Throwable> onError;

    boolean initialized;
    boolean running;

    private AtomicLong eventsLastMinute;

    public DcpClient(CouchbaseConfig couchbaseConfig) {

        client = Client.configure()
                .hostnames(couchbaseConfig.getNodes())
                .bucket(couchbaseConfig.getBucketName())
                .password(couchbaseConfig.getPassword())
                .connectionNameGenerator(couchbaseConfig::getDcpConnectionName)
                .controlParam(DcpControl.Names.CONNECTION_BUFFER_SIZE, couchbaseConfig.getDcpBufferSize())
                .bufferAckWatermark(couchbaseConfig.getDcpAckWatermark())
                .build();

        executorService = Executors.newSingleThreadScheduledExecutor();
        eventsLastMinute = new AtomicLong(0);
    }

    public boolean initialize(BiConsumer<String, JsonObject> onNext, Consumer<Throwable> onError) {

        this.onNext = onNext;
        this.onError = onError;

        client.controlEventHandler(event -> {

            if (RollbackMessage.is(event)) {
                log.debug("RollbackMessage");

                short partition = RollbackMessage.vbucket(event);
                long seqNo = RollbackMessage.seqno(event);

                client.rollbackAndRestartStream(partition, seqNo)
                        .doOnCompleted(() -> log.info("Rollback for partition {} complete", partition))
                        .doOnError(onError::accept)
                        .await();

            } else if (DcpOpenStreamResponse.is(event)) {
                log.debug("Unhandled 'DcpOpenStreamResponse' event");

            } else if (DcpStreamEndMessage.is(event)) {
                log.debug("Unhandled 'DcpStreamEndMessage' event");

            } else if (DcpSnapshotMarkerMessage.is(event)) {
                log.debug("Unhandled 'DcpSnapshotMarkerMessage' event");

            } else if (DcpFailoverLogResponse.is(event)) {
                log.debug("Unhandled 'DcpFailoverLogResponse' event");

            } else if (DcpCloseStreamResponse.is(event)) {
                log.debug("Unhandled 'DcpCloseStreamResponse' event");

            } else if (DcpGetPartitionSeqnosResponse.is(event)) {
                log.debug("Unhandled 'DcpGetPartitionSeqnosResponse' event");
            } else {
                log.debug("Unhandled DCP event of unknown type");
            }

            eventsLastMinute.getAndIncrement();

            client.acknowledgeBuffer(event);
            event.release();
        });

        client.dataEventHandler(event -> {

            if (DcpMutationMessage.is(event)) {

                String key = DcpMutationMessage.keyString(event);
                String content = DcpMutationMessage.content(event).toString(CharsetUtil.UTF_8);
                JsonObject jsonObject = JsonObject.fromJson(content);

                onNext.accept(key, jsonObject);
            }

            eventsLastMinute.getAndIncrement();

            client.acknowledgeBuffer(event);
            event.release();
        });

        client.connect()
                .doOnCompleted(() -> log.info("DCP client successfully connected"))
                .doOnError(onError::accept)
                .await();

        byte[] blob = InitialStateService.fetchState();
        boolean initialStateAvailable = blob != null && blob.length > 0;

        if (!initialStateAvailable) {
            client.initializeState(StreamFrom.NOW, StreamTo.INFINITY)
                    .doOnCompleted(() -> log.info("DCP state successfully initialized"))
                    .doOnError(onError::accept)
                    .await();
        } else {
            client.recoverState(StateFormat.JSON, blob)
                    .doOnCompleted(() -> log.info("DCP state successfully recovered"))
                    .doOnError(onError::accept)
                    .await();
        }

        initialized = true;
        return initialStateAvailable;
    }

    public void start() {

        if (!initialized) {
            throw new IllegalStateException("Not initialized");
        }

        persistState();
        client.startStreaming()
                .doOnCompleted(() -> log.info("DCP stream successfully started"))
                .doOnError(onError::accept)
                .await();

        taskHandle = executorService.scheduleWithFixedDelay(() -> {

            try {

                for (short vbid = 0; vbid < client.numPartitions(); vbid++) {
                    boolean open = client.streamIsOpen(vbid);
                    if (!open) {
                        log.warn("Stream is not open for vBucket: {}", vbid);
                    }
                }

                long events = eventsLastMinute.getAndSet(0);
                if (events > 0) {
                    log.info("Received {} DCP events received in the last minute", events);
                    persistState();
                } else {
                    log.error("No DCP events received in the last minute");
                }

            } catch (Throwable t) {
                t.printStackTrace();
            }

        }, 60, 60, TimeUnit.SECONDS);

        running = true;
    }

    public void stop() {

        if (!running) {
            throw new IllegalStateException("Not running");
        }

        taskHandle.cancel(true);
        taskHandle = null;
		
        client.stopStreaming()
            .doOnCompleted(() -> log.info("DCP stream successfully stopped"))
            .doOnError(onError::accept)
            .await();

        client.disconnect()
                .doOnCompleted(() -> log.info("DCP client successfully disconnected"))
                .doOnError(onError::accept)
                .await();

        running = false;
    }

    private void persistState() {
        log.info("Persisting current DCP state");
        byte[] blob = client.sessionState().export(StateFormat.JSON);
        InitialStateService.persistState(blob);
    }
}

BR
Alex

Can you try with 0.5.0 and see if its still broken please?

It seems to be working now. Thank you!

1 Like

Though an old thread, please have a look at Couchbase Eventing introduced in Couchbase Server 5.5:

Overview and Demo Talk : https://www.youtube.com/watch?v=SXa6PJEuaHY
Products Page : https://www.couchbase.com/products/eventing
Blog: https://blog.couchbase.com/eventing/
Documentation : https://developer.couchbase.com/documentation/server/5.5/eventing/eventing-overview.html