I’m currently trying to connect Couchbase to Kafka using the Kafka Connector. But my bucket receives all kind of documents. Here in my company we use a convention where all documents must have a “type” property and I would like to open the MutationMessage and filter the MutationMessage by this field.
What is the correct way of doing that?
Currently I created a MutationFilter that filter MutationMessage’s and then I open the message and create the JsonDocument of that Message. Like this:
class MutationFilter extends Filter {
override def pass(evt: DCPEvent): Boolean = {
val trans = new JsonTranscoder()
(evt.message().isInstanceOf[MutationMessage]) &&
(trans.byteBufToJsonObject(evt.message()
.asInstanceOf[MutationMessage]
.content()).get("type") == "MessagePosition")
}
}
In order to do this I depend on the couchbase-client library.
Is there a better way of doing this? Is this the right way?