Skip to main content
Question

Example usage of PythonCaller FMEFactoryPipeline

  • May 19, 2021
  • 2 replies
  • 83 views

Hoping someone could provide me with a comlplete example of how to use fmeobjects.FMEFactoryPipeline() in a PythonCaller

 

Specifically I would like to take points as in input and use the SurfaceModelFactory with the undocumented parameter MAXIMUM_EDGE_CALC_LENGTH

to hopefully produce a mesh with trimmed edges.

 

 

 

2 replies

  • Author
  • May 19, 2021

Unless I have made an error it appears that MAXIMUM_EDGE_CALC_LENGTH is not implemented as the below snippet removes 0 faces.

str_factory = """FACTORY_DEF * SurfaceModelFactory \
FACTORY_NAME SurfaceModelFactoryTest \
INPUT POINTS FEATURE_TYPE * \
TOLERANCE 0.0 \
MAXIMUM_EDGE_CALC_LENGTH 1.0 \
INTERPOLATION_TYPE AUTO \
OUTPUT TIN_SURFACE FEATURE_TYPE tin_surface
"""
 
pipeline = fmeobjects.FMEFactoryPipeline('TestFactory')
pipeline.addFactory(str_factory)
pipeline.processFeature(feature)
pipeline.allDone()
new_feature = pipeline.getOutputFeature()
 
print(f'Faces removed: {new_feature.getGeometry().numParts() - feature.getGeometry().numParts()}')

 


vlroyrenn
Enthusiast
Forum|alt.badge.img+14
  • Enthusiast
  • January 29, 2026

I can’t speak for the specific factory you were trying to use, but FMEFactoryPipeline isn’t documented well, so I’ll write what I know here for seachability’s sake.

I’ve not tried using a single string like that, but you can use a list of tokens, which is also easier to use if your factory parameters have some variable parts. You can see how this plays out with an excerpt from this question, which relies on AttributeKeeperFactory (see the doc for all other factories and functions):

if not hasattr(self, "factory_name"):
self.factory_name = "PythonCaller_3"

self.session = fmeobjects.FMESession()
attributes_to_keep = ["timestamp", "col_a", "col_b", "col_c", "col_d", "col_e"]
attributes_to_keep_encoded = ','.join([self.session.encodeToFMEParsableText(attr) for attr in attributes_to_keep])

self.keeper_factory = fmeobjects.FMEFactoryPipeline(self.factory_name + "_pipeline")
self.keeper_factory.addFactory([
"FACTORY_DEF", "*", "AttributeKeeperFactory",
"INPUT", "FEATURE_TYPE", "*",
"KEEP_ATTRS", attributes_to_keep_encoded,
"BUILD_FEATURE_TABLES", "YES",
"OUTPUT", "OUTPUT", "FEATURE_TYPE", self.factory_name + "_pipeline_OUTPUT"

 

Note the part where the text array has its elements reencoded into FME encoded text/FMEParsableText (i.e: text that is safe to embed in FME factory expressions and TCL functions, find of like escaping strings for SQL), which the factory documentation says is the expected format for this parameter.

Then once it’s created, you can do:

for row in src_data.iter_rows(named=True):
feat = fmeobjects.FMEFeature()
feat.setAttribute("timestamp", row["DateTime_UTC"])

# Either create your own features or pass incoming one to your pipeline

self.keeper_factory.processFeature(feat)

And once you’re done feeding them:

self.keeper_factory.allDone()

while True:
# In spite of what the doc says, getOutputFeature() does not raise
# when the pipeline is out of features but actually returns null.

out_feat = self.keeper_factory.getOutputFeature()
if out_feat is None:
break

self.pyoutput(out_feat)