About class loading isolation and converter class

Hi everyone,

I’m having problems when I try to use my own ‘dcp.message.converter.class’.

I’ve created a project that has only one class that extends ‘com.couchbase.connect.kafka.handler.source.SourceHandler’. This class set a custom message that will be send to Kafka topic and for that reason I set ‘dcp.message.converter.class’ with the name of this class.

For this purpose I set the ‘plugin-path’ property from ‘connect-standalone.properties’ adding the kafka-connect-couchbase-<version> and the classpath where is located my personal project with the converter class.

plugin.path=<path>/kafka-connect-couchbase-3.2.1-SNAPSHOT/,<path>/custom-schema/

But when I run the source connector it fails with an exception.

java.lang.ClassNotFoundException: com.source.CustomSchemaSourceHandler

What I’m doing wrong?

Thanks in advance.
Juan

1 Like

Hi Juan,

Each path in plugin.path defines a separate class loader. The Couchbase connector can’t find your custom handler because it belongs to a different class loader.

Put both the Couchbase connector JAR and your custom handler JAR in the same directory, and set plugin.path to that one directory.

Cheers,
David

EDIT: This was completely incorrect.

1 Like

Hi David,

thanks for the reply, I put both jars in the same directory but I still getting the same exception.
I can see that the loader is registered

[2017-10-31 10:14:44,598] INFO Loading plugin from: /home/user/Descargas/kafka-connect-couchbase-3.2.1-SNAPSHOT/custom-schema-0.0.1-SNAPSHOT.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:176)
[2017-10-31 10:14:44,601] INFO Registered loader: PluginClassLoader{pluginLocation=file:/home/user/Descargas/kafka-connect-couchbase-3.2.1-SNAPSHOT/custom-schema-0.0.1-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)

but it doesn’t recognize my custom class and it fails with the same ClassNotFoundException.
Do you have any idea what is wrong?

Thank you for your patience.
Juan

Hi Juan,

Sorry, I was wrong about how plugin.path works.

When a plugin consists of multiple JARs, the JARs must be in a directory that is an immediate child of a plugin path entry. For example:

plugin.path=<plugin-path-entry>

<plugin-path-entry>/
└── couchbase/
    ├── kafka-connect-couchbase-<version>.jar
    └── custom-schema-0.0.1-SNAPSHOT.jar

For reference, here’s how plugin.path is described by the Kafka Connect config documentation:

List of paths separated by commas (,) that contain plugins (connectors, converters, transformations). The list should consist of top level directories that include any combination of:
a) directories immediately containing jars with plugins and their dependencies
b) uber-jars with plugins and their dependencies
c) directories immediately containing the package directory structure of classes of plugins and their dependencies
Note: symlinks will be followed to discover dependencies or plugins.
Examples: plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors

1 Like

Hi David,

it works! Now I can use my custom converter class.

Thank you very much!
Juan

1 Like

Hi again David,

I’m having problems again with this but on dev environment with Docker.

[2017-11-22 10:25:13,209] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:65)
[2017-11-22 10:25:13,220] INFO WorkerInfo values: 
	jvm.args = -Xmx256M, -XX:+UseG1GC, -XX:MaxGCPauseMillis=20, -XX:InitiatingHeapOccupancyPercent=35, -XX:+ExplicitGCInvokesConcurrent, -Djava.awt.headless=true, -Dcom.sun.management.jmxremote, -Dcom.sun.management.jmxremote.authenticate=false, -Dcom.sun.management.jmxremote.ssl=false, -Dkafka.logs.dir=/opt/kafka/bin/../logs, -Dlog4j.configuration=file:/opt/kafka/bin/../config/connect-log4j.properties
	jvm.spec = Oracle Corporation, Java HotSpot(TM) 64-Bit Server VM, 1.8.0_144, 25.144-b01
	jvm.classpath = :/opt/kafka/bin/../libs/aopalliance-repackaged-2.5.0-b32.jar:/opt/kafka/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/bin/../libs/commons-lang3-3.5.jar:/opt/kafka/bin/../libs/connect-api-1.0.0.jar:/opt/kafka/bin/../libs/connect-file-1.0.0.jar:/opt/kafka/bin/../libs/connect-json-1.0.0.jar:/opt/kafka/bin/../libs/connect-runtime-1.0.0.jar:/opt/kafka/bin/../libs/connect-transforms-1.0.0.jar:/opt/kafka/bin/../libs/guava-20.0.jar:/opt/kafka/bin/../libs/hk2-api-2.5.0-b32.jar:/opt/kafka/bin/../libs/hk2-locator-2.5.0-b32.jar:/opt/kafka/bin/../libs/hk2-utils-2.5.0-b32.jar:/opt/kafka/bin/../libs/jackson-annotations-2.9.1.jar:/opt/kafka/bin/../libs/jackson-core-2.9.1.jar:/opt/kafka/bin/../libs/jackson-databind-2.9.1.jar:/opt/kafka/bin/../libs/jackson-jaxrs-base-2.9.1.jar:/opt/kafka/bin/../libs/jackson-jaxrs-json-provider-2.9.1.jar:/opt/kafka/bin/../libs/jackson-module-jaxb-annotations-2.9.1.jar:/opt/kafka/bin/../libs/javassist-3.20.0-GA.jar:/opt/kafka/bin/../libs/javassist-3.21.0-GA.jar:/opt/kafka/bin/../libs/javax.annotation-api-1.2.jar:/opt/kafka/bin/../libs/javax.inject-1.jar:/opt/kafka/bin/../libs/javax.inject-2.5.0-b32.jar:/opt/kafka/bin/../libs/javax.servlet-api-3.1.0.jar:/opt/kafka/bin/../libs/javax.ws.rs-api-2.0.1.jar:/opt/kafka/bin/../libs/jersey-client-2.25.1.jar:/opt/kafka/bin/../libs/jersey-common-2.25.1.jar:/opt/kafka/bin/../libs/jersey-container-servlet-2.25.1.jar:/opt/kafka/bin/../libs/jersey-container-servlet-core-2.25.1.jar:/opt/kafka/bin/../libs/jersey-guava-2.25.1.jar:/opt/kafka/bin/../libs/jersey-media-jaxb-2.25.1.jar:/opt/kafka/bin/../libs/jersey-server-2.25.1.jar:/opt/kafka/bin/../libs/jetty-continuation-9.2.22.v20170606.jar:/opt/kafka/bin/../libs/jetty-http-9.2.22.v20170606.jar:/opt/kafka/bin/../libs/jetty-io-9.2.22.v20170606.jar:/opt/kafka/bin/../libs/jetty-security-9.2.22.v20170606.jar:/opt/kafka/bin/../libs/jetty-server-9.2.22.v20170606.jar:/opt/kafka/bin/../libs/jetty-servlet-9.2.22.v20170606.jar:/opt/kafka/bin/../libs/jetty-servlets-9.2.22.v20170606.jar:/opt/kafka/bin/../libs/jetty-util-9.2.22.v20170606.jar:/opt/kafka/bin/../libs/jopt-simple-5.0.4.jar:/opt/kafka/bin/../libs/kafka-clients-1.0.0.jar:/opt/kafka/bin/../libs/kafka-log4j-appender-1.0.0.jar:/opt/kafka/bin/../libs/kafka-streams-1.0.0.jar:/opt/kafka/bin/../libs/kafka-streams-examples-1.0.0.jar:/opt/kafka/bin/../libs/kafka-tools-1.0.0.jar:/opt/kafka/bin/../libs/kafka_2.12-1.0.0-sources.jar:/opt/kafka/bin/../libs/kafka_2.12-1.0.0-test-sources.jar:/opt/kafka/bin/../libs/kafka_2.12-1.0.0.jar:/opt/kafka/bin/../libs/log4j-1.2.17.jar:/opt/kafka/bin/../libs/lz4-java-1.4.jar:/opt/kafka/bin/../libs/maven-artifact-3.5.0.jar:/opt/kafka/bin/../libs/metrics-core-2.2.0.jar:/opt/kafka/bin/../libs/osgi-resource-locator-1.0.1.jar:/opt/kafka/bin/../libs/plexus-utils-3.0.24.jar:/opt/kafka/bin/../libs/reflections-0.9.11.jar:/opt/kafka/bin/../libs/rocksdbjni-5.7.3.jar:/opt/kafka/bin/../libs/scala-library-2.12.3.jar:/opt/kafka/bin/../libs/slf4j-api-1.7.25.jar:/opt/kafka/bin/../libs/slf4j-log4j12-1.7.25.jar:/opt/kafka/bin/../libs/snappy-java-1.1.4.jar:/opt/kafka/bin/../libs/validation-api-1.1.0.Final.jar:/opt/kafka/bin/../libs/zkclient-0.10.jar:/opt/kafka/bin/../libs/zookeeper-3.4.10.jar
	os.spec = Linux, amd64, 3.10.0-327.49.2.el7.x86_64
	os.vcpus = 8
 (org.apache.kafka.connect.runtime.WorkerInfo:71)
[2017-11-22 10:25:13,221] INFO Scanning for plugin classes. This might take a moment ... (org.apache.kafka.connect.cli.ConnectStandalone:74)
[2017-11-22 10:25:13,235] INFO Loading plugin from: /files/conf (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:179)
[2017-11-22 10:25:13,305] INFO Registered loader: PluginClassLoader{pluginLocation=file:/files/conf/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:202)
[2017-11-22 10:25:13,307] INFO Loading plugin from: /files/jars (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:179)
[2017-11-22 10:25:14,034] INFO Registered loader: PluginClassLoader{pluginLocation=file:/files/jars/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:202)
[2017-11-22 10:25:14,035] INFO Added plugin 'com.couchbase.connect.kafka.CouchbaseSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:135)
[2017-11-22 10:25:14,035] INFO Added plugin 'com.couchbase.connect.kafka.CouchbaseSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:135)

It seems that jars are found but it does not continue with process after that last line.
When i run it on local environment it works and after that it process this:

[2017-11-22 11:27:52,614] INFO Registered loader: sun.misc.Launcher$AppClassLoader@764c12b6 (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)

and continues with the execution.
Any idea what is the problem?
Thanks again!

Juan

Hi Juan,

I haven’t seen this before, but here are some things to try:

  • Increase the log level for the connect worker; maybe there are some DEBUG messages with clues.

  • Can you compare the plugin JAR locations & plugin.path values between the Docker image and your local environment? Are they exactly the same?

  • What happens if you run the connector in Docker without classpath isolation? (I’m wondering if this issue is related to running in Docker)

When you say it “does not continue with process after that last line”… does the connect worker hang?

Thanks,
David

1 Like

Hi David,

I tried to run the connector without classpath isolation and it works. Maybe is an issue related to running in Docker because I checked many times if the JAR locations were in the right place.
For now I will use the connector using the CLASSPATH environment variable.
Thanks again for your help.
Cheers,
Juan