Solved

configuring kafka connector to read data concurrently from multiple partitions of a topic

  • 12 September 2019
  • 4 replies
  • 23 views

Badge

I am using Kafka connector to consume data from Kafka topic.Kafka topic has 5 partitions.I want to configure FME job in such a way that Kafka connector consumes data from topic at partition level making partition as unit of parallelism for better speed. Is Kafka connector has inbuilt feature to handle partition parallelism hiding from developer. That way I can be assured of parallelism taking place with single kafka connector in the job. Please confirm.

icon

Best answer by gerhardatsafe 13 September 2019, 01:48

View original

4 replies

Badge

Hi @rbeerak,

The KafkaConnector will use all available partitions by default if the starting offset is set to earliest or latest.

To scale for faster throughput a workspace consuming from a Kafka topic can be published to FME Server and then submitted to run on multiple engines in parallel. As long as all running workspaces have the same consumer group id specified in the KafkaConnector they will not consume duplicate but spread a load of consumed messages among FME Server jobs. To make sure a job is restarted immediately in a case of a crash the RTC (run til canceled) option should be used in the advanced Run Workspace parameters (https://docs.safe.com/fme/html/FME_Server_Documentation/WebUI/Run-Workspace.htm).

 

I hope this answers your questions and helps!

Badge

Thanks for getting back Gerhard. Can you please confirm my understanding.

1. I will create a single job where Kafka connector configured with a topic.

2. Assuming my topic has 4 partitions, I will have this same job assigned to 4 engines and schedule them.

3. Schedule the job in 4 engines. Each job instance in 4 engines will read each partition like below for

optimal consumption.

Job instance (running in Engine 1) will read Partition 1

Job instance (running in Engine 2) will read Partition 2

Job instance (running in Engine 3) will read Partition 3

Job instance (running in Engine 4) will read Partition 4

As long as Engine# to Partition # is one-one, we should expect optimal consumption.

4. In case of error while loading target table when using Batch mode for Kafka consumption, job should not commit offset although Kafka connector gets the records. I believe offset will be committed only when entire batch of records are processed meaning entire job completed successfully with no error.

 

5. I am thinking to create a Topic-partition-offset-log tracking table when running kafka FME job in batch mode. For example, if the job is configured to process 100 messages during batch execution, I will make an entry to Topic-partition-offset-log table as a last step in the job with following details -

Topic Name

Partition#

Offset (of last consumed message/max(offset from entire batch)

Number of messages read in the batch

Engine#

Total Duration of batch run

This table will help me keep a track of consumption processing.

 

Can you please confirm if my understanding is right especially on point(4). Please let me know for any corrections or suggestions. Thanks so much for your help.

 

Badge

Sorry for my late response. Thanks for your inputs

Badge

Thanks for getting back Gerhard. Can you please confirm my understanding.

1. I will create a single job where Kafka connector configured with a topic.

2. Assuming my topic has 4 partitions, I will have this same job assigned to 4 engines and schedule them.

3. Schedule the job in 4 engines. Each job instance in 4 engines will read each partition like below for

optimal consumption.

Job instance (running in Engine 1) will read Partition 1

Job instance (running in Engine 2) will read Partition 2

Job instance (running in Engine 3) will read Partition 3

Job instance (running in Engine 4) will read Partition 4

As long as Engine# to Partition # is one-one, we should expect optimal consumption.

4. In case of error while loading target table when using Batch mode for Kafka consumption, job should not commit offset although Kafka connector gets the records. I believe offset will be committed only when entire batch of records are processed meaning entire job completed successfully with no error.

 

5. I am thinking to create a Topic-partition-offset-log tracking table when running kafka FME job in batch mode. For example, if the job is configured to process 100 messages during batch execution, I will make an entry to Topic-partition-offset-log table as a last step in the job with following details -

Topic Name

Partition#

Offset (of last consumed message/max(offset from entire batch)

Number of messages read in the batch

Engine#

Total Duration of batch run

This table will help me keep a track of consumption processing.

 

Can you please confirm if my understanding is right especially on point(4). Please let me know for any corrections or suggestions. Thanks so much for your help.

 

Hi @rbeerak,

Sorry for not getting back to you sooner.

Regarding 1. - 3.:

Yes, this looks like a good approach to balance the load of messages between multiple engines. I don't think that necessarily 1 engine will consume from only 1 partition. The balancing is handled by Kafka and the Zookeeper which will make sure all consumers of the same group will share the same load of a message independent of the partition. This is important to keep consuming with all consumers (in our case engines) even if a broker with a single partition of a topic fails. This means that while consuming with 4 engines you will very likely achieve optimal consumption, but it rather depends on the message load (messages per second) that you expect to receive whether 1 or multiple engines are sufficient than on the number of partitions.

Regarding 4.:

Yes, you are correct. We currently rely on the auto-commit option of the Kafka library that we are using. So in case of a crash, it is possible to encounter duplicate message consumption if the received messages by the KafkaConnector have not been committed yet. A potential improvement on our side could be to implement an option for manual committing offsets in a workspace (this is currently not planned, but I will discuss it with development).

Regarding 5.:

This sounds like a reasonable workaround for the case discussed in 4. One other thing you could try is the advanced options in the KafkaConnector. You could experiment with the auto.commit.interval.ms parameter to overwrite the default 5000 ms. If you decrease the interval the consumed messages will be committed earlier. Since you know the number of messages in your batch you might be able to come up with an interval that commits the messages right after they are consumed. If there are still some duplicates I would maybe also think about handling them downstream (e.g. pk constraint if write the messages to a DB) instead of keeping an additional offset table.

 

 

I hope this helps and thanks a lot for your input! Please keep the feedback coming. This is a very new integration in FME with lots of complex scenarios and we are always keen to improve to make sure you'll have the best possible experience!

 

 

Thanks,

 

Gerhard

Reply