I am using Kafka connector in Batch mode to read and process 10000 messages in a given batch execution. I want to make sure Kafka connector commits offset for 10000 messages. In case of any failure in the job, and If I restart the job (Using EARLIER option), I want Kafka connector to read messages from next offset of last successful batch run. Please advise.
@rbeerak
Manually committing offsets is currently not supported in the KafakConnector. One thing you could try is the advanced options in the KafkaConnector. You could experiment with the auto.commit.interval.ms parameter to overwrite the default 5000 ms. If you decrease the interval the consumed messages will be committed earlier. Since you know the number of messages in your batch you might be able to come up with an interval that commits the messages right after they are consumed.
We are looking into options to support manual committing offsets with the KafkaConnector.
Thank you @gerhardatsafe. I will probably increase auto.commit.interval.ms and see how it works for my use case. It will be a good feature to support manual commit property especially when consumptions runs in batch mode. This will make error handling quite easy. Thanks for your support.
@gerhardatsafe I have used maximum value for auto.commit.interval.ms. However, I still do not have control over committing kafka offset.
Here is my job flow/configuration:
1. Kafka connector (acts as consumer).
Configured to run in -
"BATCH" mode
"BATCH SIZE":2000 (# of messages per batch)
"EARLIEST" for starting offset.
{"auto.commit.interval.ms":500000}
2. JSON parsing
3. Load parsed tags into postgresql with commit count:2000
Issue:
When there is some issue with table load (probably data conversion) during job execution, the job obviously fails with error and kafka offset gets committed. This means when I rerun the job fixing table, data consumption is happening from next set of messages and messages previous failed batch are lost.
I am looking for some kind of resolution in this case, otherwise I can not handle this kind of failure which is prone to happen.
Can you please help me. I appreciate your support.
@gerhardatsafe I have used maximum value for auto.commit.interval.ms. However, I still do not have control over committing kafka offset.
Here is my job flow/configuration:
1. Kafka connector (acts as consumer).
Configured to run in -
"BATCH" mode
"BATCH SIZE":2000 (# of messages per batch)
"EARLIEST" for starting offset.
{"auto.commit.interval.ms":500000}
2. JSON parsing
3. Load parsed tags into postgresql with commit count:2000
Issue:
When there is some issue with table load (probably data conversion) during job execution, the job obviously fails with error and kafka offset gets committed. This means when I rerun the job fixing table, data consumption is happening from next set of messages and messages previous failed batch are lost.
I am looking for some kind of resolution in this case, otherwise I can not handle this kind of failure which is prone to happen.
Can you please help me. I appreciate your support.
Thanks, @rbeerak for the update.
I added a ticket to our backlog for looking into options to allow a more "manual" commit to avoid the scenario you are describing.
I will let you once there is some progress on this.
I happy to let you know that the KafkaConnector does support manual commits now. Once you upgrade to the newest version you will see a new Action called Commit. To use this you add your receiving KafkaConnector setting the Action to Receive and the Auto Commit to No and in the committing KafkaConnector you reference the Receiving Connector Identifier returned by the first KafkaConnector and set your offset and partition.
Let us know if you have any feedback!