Skip to main content

Hi,

I’m getting data from a streaming source (Kafka) that contains multiple geometry per feature read in WKT. To process the geometry, I “desaggregate” the feature first, produce a geometry from the WKT string, then try to aggregate it back with the Aggregator transformer. As stream processing doesn’t allow blocking transformers, I have set the Group Processing parameter to “When Group Changes”.

My issue with this solution is that the feature leave the Aggregator only when effectively, the group changes which means when a new feature is read… I was wondering if there was a way to force the output of the latest feature, or if it was possible to make the process “Desaggregate/Produce geometry/Aggregate back” treat one feature per feature.

 

Edit: this is most probably possible with a PythonCaller, I’m wondering if there was a “native” solution.

Really hacky but you could add in a feature to force the group to change I guess - Then filter/throw it away after the group process?

I tested this with some success with the aggregator. 

This was my setup:
 


When using the AttributeCreator to create the dummy features the first real feature came though 5 seconds before the case where it was disabled.
 



I used some time calculation in the log to check but you can also see from the feature counts


I was trying to write from a stream to a database using FeatureWriter in Group mode and never found a satisfactory method to get the last group of features in a timely manner.

I think i’d look at getting the part geometry from the aggregate without disaggregating if possible.

 


I was trying to write from a stream to a database using FeatureWriter in Group mode and never found a satisfactory method to get the last group of features in a timely manner.

I think i’d look at getting the part geometry from the aggregate without disaggregating if possible.

 

It’s almost like there needs to be a group size parameter or attribute value which we can use to tell FME the group is complete. Rather than when the group changes


 - I created an idea

 


I was trying to write from a stream to a database using FeatureWriter in Group mode and never found a satisfactory method to get the last group of features in a timely manner.

I think i’d look at getting the part geometry from the aggregate without disaggregating if possible.

 

It’s almost like there needs to be a group size parameter or attribute value which we can use to tell FME the group is complete. Rather than when the group changes

In my use case, I’d want something along the lines of, assume group is complete if no new features received in x minutes, as i was using a time windower to create the groups

 

(I ended up writing each feature with an sqlexecutor instead)


Reply