Skip to main content

Hi, I am trying to make the FME KafkaConnector working. I first tested this connector to produce to a local Kafka server without SSL and authentication. That all works fine.  

Now I am trying to get it working on our company Kafka instance. This instance requires two-way SSL authentication, which means that I have client certificate, private key and password for the private key. I have configured the Kafka connector as in the screenshot below: 

img1 

The advanced options are set to:

 img2 

with this configuration, I get the following error message:

img3 

This message is very non-descriptive; it could be anything. Also if I turn on further debug logging in FME, it doesn't give any more details about the cause of the error.

 

As the FME Kafka connector is based on the Confluence Kafka Python library, I have downloaded and configured a producer using this library. The Python code is shown below:

 

from confluent_kafka import Producer 
 
p = Producer({
    'security.protocol': 'SSL',
'bootstrap.servers': 'broker.streaming-dta.eneco.com:29093',
    'client.id': 'com-eneco-service-location-db',
'ssl.key.password': 'XXXXXXXXXXXXXXXXXX',
    'ssl.key.location': 'C:\\Users\\504019\\source\\esp-servicelocation-db-dt.streaming-dta.eneco.com.key',
    'ssl.certificate.location': 'C:\\Users\\504019\\source\\esp-servicelocation-db-dt.streaming-dta.eneco.com.cer',
    'ssl.ca.location': 'C:\\Users\\504019\\source\\esp-servicelocation-db-dt.streaming-dta.eneco.com.ca'
     })
 
def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} Â{}]'.format(msg.topic(), msg.partition()))
 
msg=u'My first message','My second message']
 
for data in msg:
    # Trigger any available delivery report callbacks from previous produce() calls
    p.poll(0)
 
    # Asynchronously produce a message, the delivery report callback
    # will be triggered from poll() above, or flush() below, when the message has
    # been successfully delivered or failed permanently.
    p.produce('eneco-dta-test-odp-servicelocation-ingest-json-1', data.encode('utf-8'), callback=delivery_report)
 
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()

 

This code works without any issues. So in principle it should work with FME as well.

 

Has anyone an idea what is wrong with the configuration in the FME Kafka Connector? 

 

I will file a support call on this too. We are using FME Desktop FME(R) 2020.2.3.0 (20210129 - Build 20820 - WIN64).

 

Thanks and best regards,

 

P/a Frank Cremer

We tried again yesterday, exact same settings and now it works! However, when trying to get topics from the Kafka server, it still does not work. Looks like the KafkaConnector does not use advanced settings when querying for topics?


We tried again yesterday, exact same settings and now it works! However, when trying to get topics from the Kafka server, it still does not work. Looks like the KafkaConnector does not use advanced settings when querying for topics?

Hello,

 

Have you finally solved your issue ?

 

We are desperately trying to make FME working with our Kafka cluster in SASL_SSL mode (SCRAM-SHA-512).

The Kafka cluster runs well : we are able to produce/consume from any Java/Python source code as well as via Apache NiFi, but on our FME Desktop, we get the following error message :

 

 

I don't have any more message to help me to troubleshoot it.

 

Any idea ?

 

Thanks in advance.

 

LC

 

 

 

 


Hi @laule75​  Too bad, we did not do any more investigations in the FME Platform, since we went for the Python solution.... It might be that Frank Cremer could help you any further, I'll forward this to him.

Regards, Helmoet.


Hi @laule75​  Too bad, we did not do any more investigations in the FME Platform, since we went for the Python solution.... It might be that Frank Cremer could help you any further, I'll forward this to him.

Regards, Helmoet.

Many thanks @Helmoet de Zwijger​ for your prompt reply.


Reply