2

I have Apache Kafka and Couchbase running on my local machine Ubuntu VM. I have a SourceConnector that reads from MS SQL, and a sink connector that will write to Couchbase. I'm currently having trouble with the sink connector.

When I provision the sink connector and check its status, it's running. However, after I update a record a record in my SQL DB and check the connector status, I get this exception:

enter image description here

I have verified that the data is flowing in to the Kafka topics and streams. The only part that doesn't work is writing to Couchbase. (Couchbase is not updated, plus I get the exception in the connector status).

Here is my connector JSON config:

{
"connector.class":"com.couchbase.connect.kafka.CouchbaseSinkConnector",
"tasks.max":"1",
"topics":"weconnect-customers-sink",
"connection.cluster_address":"127.0.0.1",
"connection.ssl.enabled":"false",
"connection.bucket":"accounts",
"connection.username":"Administrator",
"connection.password":"Couchbase",
"couchbase.durability.persist_to":"NONE",
"couchbase.durability.replicate_to":"NONE",
"couchbase.remove.document.id": "true",
"couchbase.document.id": "${/id}",
"auto.offset.reset":"latest",

"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",

"transforms": "joltFields,replaceFields",

"transforms.joltFields.transformType": "chainr",
"transforms.joltFields.transformSpec": "[{\"operation\":\"shift\",\"spec\":{\"*\":\"&\",\"CHANNELS_*\":\"CHANNELS[]\",\"ADDRESS_CITY\": \"ADDRESS.CITY\",\"ADDRESS_COUNTRYCODE\":\"ADDRESS.COUNTRYCODE\",\"ADDRESS_POSTALCODE\":\"ADDRESS.POSTALCODE\",\"ADDRESS_REGION\":\"ADDRESS.REGION\",\"ADDRESS_STREETHOUSENUMBER\":\"ADDRESS.STREETHOUSENUMBER\", \"COMPANYCODE\":\"COMPANY.CODE\",\"CUSTOMERTYPE\":\"CUSTOMERTYPE.CODE\", \"DOCUMENT_TYPE\": \"DOCUMENTTYPE\"}}]",
"transforms.joltFields.type": "com.pvh.kafka.connect.transforms.JoltFields",

"couchbase.n1ql.clause.fields": "documentType:customer,company.code,customerNumber",
"couchbase.n1ql.clause": "WHERE",
"couchbase.n1ql.operation": "UPSERT",
"couchbase.document.mode": "N1QL",
"couchbase.subdocument.create_document": "false",
"couchbase.create_document": "true",

"transforms.replaceFields.type": "com.pvh.kafka.connect.transforms.ReplaceFields",
"transforms.replaceFields.schema": "{\"type\": \"record\",\"name\": \"CustomersValue\",\"namespace\": \"com.pvh.digitalshowroom\",\"fields\": [{\"name\": \"channels\",\"type\": [ \"null\",{\"type\": \"array\",\"items\": \"string\"}],\"default\": null,\"aliases\": [ \"CHANNELS\" ]},{\"name\": \"id\",\"type\": \"string\",\"aliases\": [ \"id\",\"ID\"]},{\"name\": \"customerNumber\",\"type\": \"string\",\"aliases\": [ \"customerNumber\",\"CUSTOMERNUMBER\"]},{\"name\": \"name\",\"type\": \"string\",\"aliases\": [ \"NAME\"]},{\"name\": \"languageCode\",\"type\": \"string\",\"aliases\": [ \"languagecode\",\"LANGUAGECODE\"]},{\"name\": \"phone\",\"type\": \"string\",\"aliases\": [ \"PHONE\"]},{\"name\": \"remarks\",\"type\": \"string\",\"aliases\": [ \"REMARKS\"]},{\"name\": \"vatCode\",\"type\": \"string\",\"aliases\": [ \"vatcode\",\"VATCODE\"]},{\"name\": \"address\",\"type\":{\"type\": \"record\",\"namespace\": \"com.pvh.digitalshowroom\",\"name\": \"address\",\"fields\": [{\"name\": \"city\",\"type\": \"string\",\"aliases\": [ \"CITY\",\"city\"]},{\"name\": \"postalCode\",\"type\": \"string\",\"aliases\": [ \"POSTALCODE\"]},{\"name\": \"region\",\"type\": \"string\",\"aliases\": [ \"REGION\"]},{\"name\": \"streetHouseNumber\",\"type\": \"string\",\"aliases\": [ \"STREETHOUSENUMBER\"]},{\"name\": \"countryCode\",\"type\": \"string\",\"aliases\": [ \"COUNTRYCODE\"]}]},\"aliases\": [ \"ADDRESS\"]},{\"name\": \"company\",\"type\":{\"type\": \"record\",\"namespace\": \"com.pvh.digitalshowroom\",\"name\": \"company\",\"fields\": [{\"name\": \"code\",\"type\": \"string\",\"aliases\": [ \"CODE\",\"COMPANYCODE\"]}]},\"aliases\": [ \"COMPANY\"]},{\"name\": \"customerType\",\"type\":{\"type\": \"record\",\"namespace\": \"com.pvh.digitalshowroom\",\"name\": \"customerType\",\"fields\": [{\"name\": \"code\",\"type\": \"string\",\"aliases\": [ \"CODE\",\"CUSTOMERTYPE\"]}]},\"aliases\": [ \"CUSTOMERTYPE\"]},{\"name\": \"doumentType\",\"type\": \"string\",\"aliases\": [ \"DOCUMENTTYPE\"]}]}"

}

Can anybody tell me what is wrong with my config? This config worked in the previous version of the connector (3.3.0), but not in the current version (3.4.4). The only things I've changed are:

  • changed couchbase.document.id to ${/id} . (used to be /id, documentation says to change this)
  • added "connection.ssl.enabled":"false"
  • tried all the following values for connection.cluster_address (no difference): 127.0.0.1, localhost, 10.0.2.15 (local IP), 127.0.0.1:8091, localhost:8091, 10.0.2.15:8091
  • (verified I can browse to the Couchbase admin UI at 127.0.0.1:8091)
0

The NoSuchMethodError indicates there might be an incompatible version of the Coucbase core-io or java-client library somewhere in your connector's class path. Make sure there aren't any old versions of these libraries (or old versions of the connector jar) in your installation directory.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service, privacy policy and cookie policy

Not the answer you're looking for? Browse other questions tagged or ask your own question.