Opendistro - kafka connect

Hi, im tyring to use kafka-connect with open distro, I’m using the es-sink-connector.
i get an erorr: java.lang.IllegalStateException: Not a JSON Object: "Unauthorized".
I’m using the admin user to access from the connector, but the same error.
i saw on few blogs that there is a problem to connect kafka-connect-es-sink and open-distro, is it true?
thanksss!

I haven’t heard that but from the documentation it seems like it should work - it certainly looks you’re not passing your credentials from the error message.

yes, it bit wired, that is the connector that i’m creating :
curl -X POST http://localhost:8083/connectors -H “Content-Type: application/json” -d ‘{
“name”: “simple-elasticsearch-connector2”,
“config”: {
“connector.class”: “io.confluent.connect.elasticsearch.ElasticsearchSinkConnector”,
“connection.url”: “http://od-master.internal:9200”,
“tasks.max”: “1”,
“topics”: “test”,
“key.ignore”: “true”,
“name”: “simple-elasticsearch-connector2”,
“connection.username”: “admin”,
“connection.password”: “admin”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “false”,
“schema.ignore”: “true”,
“type.name”: “_doc”
} }’

(my admin user is admin:admin)

“connection.url”: “http://od-master.internal:9200”,

Shouldn’t that be https?

@searchymcsearchface @pablo @idanl I am trying to do same thing, with kafka-connect-opensearch, but I have also enable the SASL_PLAINTEXT authentication, so while creating new connector using kafka connect REST API, it showing authentication failed.

Need to understand what key - value pair I need to pass as a json body for SASL_PLAINTEXT authentication? Currently my JSON Body is -

{

    "name": "first-opensearch-connector",

    "config": {

        "name": "first-opensearch-connector",

        "connector.class": "com.dmathieu.kafka.opensearch.OpenSearchSinkConnector",

        "type.name": "_doc",

        "connection.password": "admin",

        "connection.username": "admin",

        "connection.url": "http://10.30.1.101:9200",

        "topics": "my_topic",

        "value.converter": "org.apache.kafka.connect.json.JsonConverter",

        "value.converter.schemas.enable": "false",

        "tasks.max": "1",

        "key.ignore": "true",

        "schema.ignore": "true",

        "drop.invalid.message":"true",

        "behavior.on.malformed.documents":"fail",

        "write.method":"INSERT",

        "read.timeout.ms":"10000",

        "errors.log.enable": "true",

        "errors.log.include.messages": "true",

        "errors.tolerance": "all",

        "transforms":"AddPrefix,TimestampRouter,InsertField",

        "transforms.AddPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",

        "transforms.AddPrefix.regex":".*",

        "transforms.AddPrefix.replacement":"acme_$0",

        "transforms.TimestampRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter",

        "transforms.TimestampRouter.topic.format": "foo-bar-${topic}-${timestamp}",

        "transforms.TimestampRouter.timestamp.format": "YYYYMMdd",

        "transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",

        "transforms.InsertField.static.field": "MessageSource",

        "transforms.InsertField.static.value": "Kafka Connect framework",

        "sasl.mechanism": "PLAIN",

        "security.protocol": "SASL_PLAINTEXT",

        "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\""

    }

}

I’m not an expert on this part of the OpenSearch. You might want to cross post it in the security category for more visibility.

Ok, thanks for the direction.