Skip to main content

I am trying to set a published workspace parameter(s) to automate an ETL routine that reads multiple tables from one PostGIS DB to incrementally update another single table in a separate PostGIS database. The simple workflow is something like this:

a) Multiple tables from a PostGIS database are read and concatenated in another separate PostGIS database table representing a single canonical theme of these records. These themes classify certain field collection survey records into a meaningful, single common theme.

b) The second database does have a function which writes the id, feature_type and a run_date to an etl_log table each time the routine processes. I'd like to use this automated output as an input value each time an update routine is called on the server. I've tried to work in a SQL SELECT statement that filters only those records that have a created_at or updated_at attribute greater than (or after) the run_date and label those as "good", passing through to the write phase of the routine. I've only had success in running on single instances of tables, using FeatureMerger and TestFilter transformers.

c) I want to automate the update process which could be run weekly or monthly, in order to update only those feature records that have changed or are new since the last time the ETL routine processed records. So it is a database updating process, but requires a global environment parameter saved somewhere that can be called and triggered when the next instantiation of the routine is run (I think).

d) Where can I put a workspace parameter to trigger this update routine to only extract and load those records into the table that are after the last run date? I've researched SQLExtractor, FeatureMerger, and InlineQuerier but can't quite find the solution.

e) I'm prototyping these workspaces in Desktop but will eventually publish to an FME server instance after proofing the routine works correctly. There are several of these routines creating different classification themes in the production database, but the source survey tables come from teh same database.

I suspect you may get more answers if you post shorter and more specific questions, but here are some thoughts.

  • I usually end up using configuration files that are separate from the FME workspaces. The settings are read into the workspace using scripted private parameters (Python). This lets you parametrize the workspace differently for development, staging and production without having to hard code values each time you publish the workspace. I use this for static settings.
  • FME's named database connections are fantastic, use them to your advantage.
  • If possible, a metadata table where you can save states etc between runs can be a big help. I use this for dynamic settings.
  • I fully agree about leveraging the updated_at attribute. Doing a full compare every time is very costly, performance wise. Just remember that timestamps are essentially treated as strings in FME so you'll have to cast them back to timestamps in the SQL when comparing to updated_at.
  • As much as possible, prefer doing database joins to FeatureMergers. If you have to use FeatureMergers, try your best to un-block them (using Suppliers first). SQLCreators and/or SQLExecutors are my go-to transformers.

Hopefully this can help you along.


I suspect you may get more answers if you post shorter and more specific questions, but here are some thoughts.

  • I usually end up using configuration files that are separate from the FME workspaces. The settings are read into the workspace using scripted private parameters (Python). This lets you parametrize the workspace differently for development, staging and production without having to hard code values each time you publish the workspace. I use this for static settings.
  • FME's named database connections are fantastic, use them to your advantage.
  • If possible, a metadata table where you can save states etc between runs can be a big help. I use this for dynamic settings.
  • I fully agree about leveraging the updated_at attribute. Doing a full compare every time is very costly, performance wise. Just remember that timestamps are essentially treated as strings in FME so you'll have to cast them back to timestamps in the SQL when comparing to updated_at.
  • As much as possible, prefer doing database joins to FeatureMergers. If you have to use FeatureMergers, try your best to un-block them (using Suppliers first). SQLCreators and/or SQLExecutors are my go-to transformers.

Hopefully this can help you along.

David, thank you for the answer. Yes, I know my question was a bit wordy, but the process I'm mind mapping seems a bit more complex than I anticipated. If I had to break it down to a core problem set, it would be this:

 

 

- I require some environment (or global) variable, preferably from and through an FME workspace or server parameter, that can be called as a value to apply in several different incremental database updates. And each time the incremental update is performed, this variable parameter is updated as well, establishing a kind of looped process which is kicked off by a email notice (automated) or manually. The update should only write those records which have changed from last time an update was run and will have to track many records from several source tables in a single destination table. I thought the InlineQuerier was the answer, but it seems to only action tables from one database server instead of the two separate servers, which is what I need the routine to perform.

 

 

In any case, you've provided some additional options for me to explore. Thank you.

 

 


David, thank you for the answer. Yes, I know my question was a bit wordy, but the process I'm mind mapping seems a bit more complex than I anticipated. If I had to break it down to a core problem set, it would be this:

 

 

- I require some environment (or global) variable, preferably from and through an FME workspace or server parameter, that can be called as a value to apply in several different incremental database updates. And each time the incremental update is performed, this variable parameter is updated as well, establishing a kind of looped process which is kicked off by a email notice (automated) or manually. The update should only write those records which have changed from last time an update was run and will have to track many records from several source tables in a single destination table. I thought the InlineQuerier was the answer, but it seems to only action tables from one database server instead of the two separate servers, which is what I need the routine to perform.

 

 

In any case, you've provided some additional options for me to explore. Thank you.

 

 

The InlineQuerier is basically a glorified FeatureMerger that uses a temporary SQLite database to do joins. It has no access to external databases other than the features that enter the transformer itself, so it may not be what you're looking for.

 

I'm starting to think that the name "InlineQuerier" is slightly misleading, actually.

Hi @tjpollard

For b) You can start your workspace with an SQLCreator and connect to the database to get the max (run_date) from the etl_log table. This could then be put into a VariableSetter. Then you can have another thread starting with a Creator >VariableRetriever and going to the FeatureReader(PostGIS) with the where clause set, in the FeatureReader pick the features you are wanting to read and the where clause will be applied to all features (assuming they have a common last updated date field).

I'm attaching a workspace I have put together with some sample address data that has a date column. variablesetterretriever.fmw .

I hope this will give you a start for the approach in b.


Hi @tjpollard

For b) You can start your workspace with an SQLCreator and connect to the database to get the max (run_date) from the etl_log table. This could then be put into a VariableSetter. Then you can have another thread starting with a Creator >VariableRetriever and going to the FeatureReader(PostGIS) with the where clause set, in the FeatureReader pick the features you are wanting to read and the where clause will be applied to all features (assuming they have a common last updated date field).

I'm attaching a workspace I have put together with some sample address data that has a date column. variablesetterretriever.fmw .

I hope this will give you a start for the approach in b.

Steve,

 

 

Thanks much for the jump start workspace. I'm new at FME so this will really help me understand the many parts and pieces of building out a data workflow.

 

 


Hi @tjpollard

For b) You can start your workspace with an SQLCreator and connect to the database to get the max (run_date) from the etl_log table. This could then be put into a VariableSetter. Then you can have another thread starting with a Creator >VariableRetriever and going to the FeatureReader(PostGIS) with the where clause set, in the FeatureReader pick the features you are wanting to read and the where clause will be applied to all features (assuming they have a common last updated date field).

I'm attaching a workspace I have put together with some sample address data that has a date column. variablesetterretriever.fmw .

I hope this will give you a start for the approach in b.

@SteveAtSafe, thanks for offering this as a start. I have a couple questions concerning it's implementation in a workflow:

 

 

1) Can I use the VariableSetter output directly into the Supplier input of a FeatureMerger transformer?

 

 

2) Can I input multiple datasets into the Requestor input of the FeatureMerger transformer?

 

 

3) How does the Creator transformer know to use the run_date as the variable to retrieve?

 

 

4) Can a "SQL To Run After Write" statement, such as INSERT INTO xxx_catalog.etl_log(feature_type) VALUES ('<survey>'); work as a parameter on the destination database to keep the date current as the "last_updated" timestamp?

 

 

Thanks again.

 

 

Todd

 


Reply