Hi, I am trying to make the FME KafkaConnector working. I first tested this connector to produce to a local Kafka server without SSL and authentication. That all works fine. Â
Now I am trying to get it working on our company Kafka instance. This instance requires two-way SSL authentication, which means that I have client certificate, private key and password for the private key. I have configured the Kafka connector as in the screenshot below:Â
Â
The advanced options are set to:
 Â
with this configuration, I get the following error message:
Â
This message is very non-descriptive; it could be anything. Also if I turn on further debug logging in FME, it doesn't give any more details about the cause of the error.
Â
As the FME Kafka connector is based on the Confluence Kafka Python library, I have downloaded and configured a producer using this library. The Python code is shown below:
Â
from confluent_kafka import ProducerÂ
Â
p = Producer({
    'security.protocol': 'SSL',
'bootstrap.servers':Â 'broker.streaming-dta.eneco.com:29093',
    'client.id': 'com-eneco-service-location-db',
'ssl.key.password':Â 'XXXXXXXXXXXXXXXXXX',
    'ssl.key.location': 'C:\\Users\\504019\\source\\esp-servicelocation-db-dt.streaming-dta.eneco.com.key',
    'ssl.certificate.location': 'C:\\Users\\504019\\source\\esp-servicelocation-db-dt.streaming-dta.eneco.com.cer',
    'ssl.ca.location': 'C:\\Users\\504019\\source\\esp-servicelocation-db-dt.streaming-dta.eneco.com.ca'
     })
Â
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} Â{}]'.format(msg.topic(), msg.partition()))
Â
msg=u'My first message','My second message']
Â
for data in msg:
    # Trigger any available delivery report callbacks from previous produce() calls
    p.poll(0)
Â
    # Asynchronously produce a message, the delivery report callback
    # will be triggered from poll() above, or flush() below, when the message has
    # been successfully delivered or failed permanently.
    p.produce('eneco-dta-test-odp-servicelocation-ingest-json-1', data.encode('utf-8'), callback=delivery_report)
Â
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()
Â
This code works without any issues. So in principle it should work with FME as well.
Â
Has anyone an idea what is wrong with the configuration in the FME Kafka Connector?Â
Â
I will file a support call on this too. We are using FME Desktop FME(R) 2020.2.3.0 (20210129 - Build 20820 - WIN64).
Â
Thanks and best regards,
Â
P/a Frank Cremer