Skip to main content

Hi

New to FME. Familiar with ETL/ELT tools.

I would like to read rows from a Postgres DB and load them into Snowflake DB.

I want to do it incrementally, i.e. read from Postgres only the rows that have changed since the last time the Workspace run. I have some timestamp field in the table, named "last_updated".

The query should look like: "select * from table where last_updated > /previous_max_last_updated",

I thought maybe doing it using a special management table, that will hold the values of last_updated for each source table, but it will require some more reader and writer in the workspace.

 

Is there a way to hold variables from session to session? I mean, from 1 workspace run to the next?

 

 

Thanks!

 

Hi @benros, 

Not sure if such parameter is there. Anyhow it's not in the workspace .fmw file as far as I can see. 

However, this is what I think you could do:

1. You can create a Python scripted parameter that returns the name of a logfile to be used in you FME Workspace. Like (in this example I assume a job is not run more than once a day):

from datetime import datetime
return 'mylogfile_'+ datetime.today().strftime('%Y%m%d') + '.log'

2. The workspace should have its log file name linked to the value of that scripted parameter. You can set the log files name from the navigator, under Workspace Parameters > Logging. 

3. From another scripted parameter you can check the available logfiles, and select the last one (before the current run). The other scripted parameter should return the date part from the logfile name found.

4. This last scripted parameter can be used to select your records in the where clause of the database reader. 

No extra transformer needed :-) Not sure if this is neat FME common practise...


Hi @benros, 

Not sure if such parameter is there. Anyhow it's not in the workspace .fmw file as far as I can see. 

However, this is what I think you could do:

1. You can create a Python scripted parameter that returns the name of a logfile to be used in you FME Workspace. Like (in this example I assume a job is not run more than once a day):

from datetime import datetime
return 'mylogfile_'+ datetime.today().strftime('%Y%m%d') + '.log'

2. The workspace should have its log file name linked to the value of that scripted parameter. You can set the log files name from the navigator, under Workspace Parameters > Logging. 

3. From another scripted parameter you can check the available logfiles, and select the last one (before the current run). The other scripted parameter should return the date part from the logfile name found.

4. This last scripted parameter can be used to select your records in the where clause of the database reader. 

No extra transformer needed :-) Not sure if this is neat FME common practise...

Such a parameter might be a nice idea to raise, however.


@benros I think using your own suggestion of a history table or special management table is probably the most robust solution. Adding an extra reader isn't big overhead for FME. The pattern in your workspace would be something like:

Postgresql (history table) -> SQLExecutor or FeatureReader (data tables)


Perhaps you could also ask Snowflake?

https://support.snowflake.net/s/question/0D50Z00008lgahUSAQ/how-to-check-the-last-time-a-table-was-loaded-


I decided to go for a classic solution.

I created an admin table which holds, for each workspace, the maximum value of this increment field, from the last run (first value '2000-01-01 00:00:00').

 

At the beginning of each run, I use a reader to get this value from the table, then I use a FeatureReader to get all the rows from the source table that were changed since this value.

After writing this data with a Writer, I update this admin table with the new max(value), from the current run, which will be the base value for the next run.

 

Thanks you all for your answers


Reply