Hi, I am trying to make the FME KafkaConnector working. I first tested this connector to produce to a local Kafka server without SSL and authentication. That all works fine.
Now I am trying to get it working on our company Kafka instance. This instance requires two-way SSL authentication, which means that I have client certificate, private key and password for the private key. I have configured the Kafka connector as in the screenshot below:
The advanced options are set to:
with this configuration, I get the following error message:
This message is very non-descriptive; it could be anything. Also if I turn on further debug logging in FME, it doesn't give any more details about the cause of the error.
As the FME Kafka connector is based on the Confluence Kafka Python library, I have downloaded and configured a producer using this library. The Python code is shown below:
from confluent_kafka import Producer
p = Producer({
'security.protocol': 'SSL',
'bootstrap.servers': 'broker.streaming-dta.eneco.com:29093',
'client.id': 'com-eneco-service-location-db',
'ssl.key.password': 'XXXXXXXXXXXXXXXXXX',
'ssl.key.location': 'C:\\Users\\504019\\source\\esp-servicelocation-db-dt.streaming-dta.eneco.com.key',
'ssl.certificate.location': 'C:\\Users\\504019\\source\\esp-servicelocation-db-dt.streaming-dta.eneco.com.cer',
'ssl.ca.location': 'C:\\Users\\504019\\source\\esp-servicelocation-db-dt.streaming-dta.eneco.com.ca'
})
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
msg=['My first message','My second message']
for data in msg:
# Trigger any available delivery report callbacks from previous produce() calls
p.poll(0)
# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
p.produce('eneco-dta-test-odp-servicelocation-ingest-json-1', data.encode('utf-8'), callback=delivery_report)
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()
This code works without any issues. So in principle it should work with FME as well.
Has anyone an idea what is wrong with the configuration in the FME Kafka Connector?
I will file a support call on this too. We are using FME Desktop FME(R) 2020.2.3.0 (20210129 - Build 20820 - WIN64).
Thanks and best regards,
P/a Frank Cremer