Skip to main content
Question

Best way to update prefetch cache in stram processing


stlj
Contributor
Forum|alt.badge.img+5
  • Contributor
HiI work with stream processing and need to validate data in real time against a database using for example a database joiner. We have an MQ connector recieving large volumes of real time data. For best performance, I need to cache the dataset I am validating against, but the question is, what is the best way to update the cache? The data is updated in the database once per hour and there is 1.000.000 records in the table. A timewindow change can trigger cache update in Fme, but I can´t find out how to solve it, and what readers/transformers to use./Stefan     *Edit, caption should be Stream processing….Cant edit.

3 replies

mattmatsafe
Safer
Forum|alt.badge.img+13

Hi ​@stlj

You’re correct, the TimeWindower’s window changed port is the key. This article may help:

https://support.safe.com/hc/en-us/articles/25407533004429-Joining-to-external-datasets-when-running-in-stream-mode

If you’re already windowing your incoming stream, you can connect the WindowChanged port to a FeatureReader to that reloads data from your table. Depending on your window duration, you may also want to enable a cache on the FeatureReader, as described in the article. For example, if you know the data is only updated every hour, but your windows are 5 minutes in duration, you could set a cache timeout of 1 hour, so that the data isn’t reloaded every 5 minutes. 

If you’re not currently windowing the incoming stream, you could still use this approach: split the incoming stream to a TimeWindower with a 1 hour duration and only use the WIndowChanged port. So the the sole purpose of that TimeWindower now is to output WindowChanged features every hour and refresh the dataset. 

I hope this helps!

PS: you may also find this article on windowing useful: https://support.safe.com/hc/en-us/articles/25407392899981-Windowing-Data-Streams-in-FME


stlj
Contributor
Forum|alt.badge.img+5
  • Author
  • Contributor
  • April 17, 2025

Thanks Matt
I will look into the articles and give it another try. The ideal solution for me is the database joiner with prefetch query. Performance looks ok but I need reread the prefetch query every hour, and I didn't find a way to do that. My challenge is to validate four text attributes 
on each feature against a database table with 1.000.000 records. If I get one match, the feature is ok. If not, it needs to be marked as invalid and stored in db as invalid for later correction. Our solution needs to be able to handle more +250 features/second.  I tried to read the 1.000.000 rec table to an inmemory SQLite db, create index on it and then use SQL Executor to query the table. Then, at a certain point, truncate and fill the table again. Looks like it works well. The time for repopulation the SQLite table is under 20 sec so that will be ok. Will work a little bit more on this and test to find a good solution. 
Thanks
Stefan 

 


mattmatsafe
Safer
Forum|alt.badge.img+13

Sounds good Stefan. There are a few ways to approach this, but it will require some testing to find the most performant solution. SQLite is fast for reads and would be very quick to query against incoming streaming features. I asked a few developers about this and they agreed. It sounds like you’re on the right track. You could even look at a second process run on schedule that extracts from your source DB hourly into SQLite stored on the engine server, which the engine should be able to read very quickly into memory. Something like Source DB > SQLite > Feature Reader cache.

I think adding a cache/prefetch timeout to the Database Joiner, like the FeatureReader has would be worthwhile idea to create for streaming workflows like this.


Cookie policy

We use cookies to enhance and personalize your experience. If you accept you agree to our full cookie policy. Learn more about our cookies.

 
Cookie settings