Skip to main content

I would like to be able to separate a feature into smaller chunks and process them 1 by 1

I have managed to use ModuloCounter to separate the feature but I am unsure how to get the rest of the workspace to process the broken up features 1 by 1.

If I send all the output into pythonCaller they are executed together at the same time.

Any ideas?

a) you should be able to alter your python code to process the features individually -- posting the code, or at least the pseudo structure would help.

b) if you want 1 by 1 you're better off with a counter rather than a modulo counter

c) you can wrap the pythonCaller in a custom transformer and set the parallel process group by to your unique ID attribute (with or without actual parallel processing)

Also what is the AttributeRangeFilter doing?


a) you should be able to alter your python code to process the features individually -- posting the code, or at least the pseudo structure would help.

b) if you want 1 by 1 you're better off with a counter rather than a modulo counter

c) you can wrap the pythonCaller in a custom transformer and set the parallel process group by to your unique ID attribute (with or without actual parallel processing)

Also what is the AttributeRangeFilter doing?

The AttributeRangeFilter is separating the incoming feature into separate features based on the number set by the ModuloCounter

 

 


Hi @ashertbrooks,

It is not quite clear what exactly you want to do. Is it possible to describe the problem a bit more in depth and provide the Python code? By default a PythonCaller does process features one by one. Making sure that features enter the PythonCaller via different connection lines does not change anything to that.


The AttributeRangeFilter is separating the incoming feature into separate features based on the number set by the ModuloCounter

 

 

Except they are all connected to the PythonCaller, so it's not actually doing anything.

 

 

Also a bit of terminology that might help clarify responses. You have one dataset (the csv), you have 38070 features, each one is a row in the csv. The arrows on each transformer are called ports, The lines between each port are connections.

 

 

Looking at the screenshot the first feature (1st data row in the csv) get's assigned a value of 0 by the modulo counter, two attribute manipulations are performed on the feature (string replacers), it is routed by the AttributeRangeFilter (ARF) through the first port which is connected to the pythonCaller. the second feature has a value of 1 in the modulo counter, has the same manipulations, is filtered through the second port on the ARF, but ends up at the same pythonCaller.

 


Hi @ashertbrooks,

It is not quite clear what exactly you want to do. Is it possible to describe the problem a bit more in depth and provide the Python code? By default a PythonCaller does process features one by one. Making sure that features enter the PythonCaller via different connection lines does not change anything to that.

My appologies. I want to batch rows of data into smaller chunks that are used later in httpCaller. Basically I take a source file with approximately 50,000 records and 64 columns and create some JSON used to update a datastore, the issue is that due its size the process of aggregating the columns and rows into JSON to the datastore takes a long time.

 

 

By breaking the 50,000 records into 10 chunks of 5,000 and process them one after the other, it is much faster and if it errors due to data formatting than I have only lost 10% of the data rather than the whole lot failing.

 

 


Except they are all connected to the PythonCaller, so it's not actually doing anything.

 

 

Also a bit of terminology that might help clarify responses. You have one dataset (the csv), you have 38070 features, each one is a row in the csv. The arrows on each transformer are called ports, The lines between each port are connections.

 

 

Looking at the screenshot the first feature (1st data row in the csv) get's assigned a value of 0 by the modulo counter, two attribute manipulations are performed on the feature (string replacers), it is routed by the AttributeRangeFilter (ARF) through the first port which is connected to the pythonCaller. the second feature has a value of 1 in the modulo counter, has the same manipulations, is filtered through the second port on the ARF, but ends up at the same pythonCaller.

 

Thanks jdh, I am fairly new to FME and still getting familiar with the terminology

 

 


Hi @ashertbrooks,

It is not quite clear what exactly you want to do. Is it possible to describe the problem a bit more in depth and provide the Python code? By default a PythonCaller does process features one by one. Making sure that features enter the PythonCaller via different connection lines does not change anything to that.

If it helps, all the python caller does is create the JSON in the format required later by the httpCaller

 

 

tmp = '{"REGISTER_NAME" : "' + feature.getAttribute('REGISTER_NAME') + '", "ADV_NAME" : "' + feature.getAttribute("ADV_NAME") + '", and so on... + '"}'

 

 

feature.setAttribute("JSON_DATA", tmp) #json.dumps(tmp)

 

 


What I would suggest is using the JSON templater.


What I would suggest is using the JSON templater.

Would it run one group of features behind the other? Because the python currently creates the JSON fine but the issue here is that I need FME to process the 10 different groups created by AttributeRangeFilter 1 after the other

 

 


Hi @ashertbrooks

 

Thanks
 for providing more information. I think the Pythoncode below will allow
 you to solve this problem without making use of the modulocounter:

What the code does is count how many features have entered already and when this count exceeds a certain number (5 in my case), it will export a feature containing the json.

import fme
import fmeobjects

class FeatureProcessor(object):
    
    # Initialise some variables before the first feature enters
    def __init__(self):
        self.nbFeaturesProcessed = 0
        self.json = "{"
        
    # This code is executed everytime a feature enters the PythonCaller
    def input(self,feature):
        
        self.json += "'CREATION_INSTANCE':{},".format(feature.getAttribute('_creation_instance'))
        
        self.nbFeaturesProcessed += 1
        
        # Check if already 5 features are processed
        # P.S. You can make a parameter from the number 10 0000 ;)
        if self.nbFeaturesProcessed >= 5:
            self.export()
        
    # This method is called after the last feature has entered
    def close(self):
        
        # If some features where added but the amount is < 5,
        # we should make sure those are exported also
        if self.nbFeaturesProcessed > 0:
            self.export()
        
        
    # Method that is called to export the created json
    def export(self):
        
        # Finalise the json-file by removing the last comma and adding the 
        # curly bracket at the end
        json = "{}{}".format(self.jsonÂ:-1], '}')
        
        # Create a new feature having only one attribute containing the json
        newFeature = fmeobjects.FMEFeature()
        newFeature.setAttribute('json', json)
        self.pyoutput(newFeature)
        
        # reset the variables to start over again
        self.nbFeaturesProcessed = 0
        self.json = "{"

I just create 10 features in the creator and as you can see, those are combined into 2 groups containing 5 features each.

0684Q00000ArKX8QAN.png

If you still have questions. Please ask!

 


Hi @ashertbrooks,

It is not quite clear what exactly you want to do. Is it possible to describe the problem a bit more in depth and provide the Python code? By default a PythonCaller does process features one by one. Making sure that features enter the PythonCaller via different connection lines does not change anything to that.

I had to create another answer since the amount of characters in a comment is limited ;)

 

 


Hi @ashertbrooks

 

Thanks
 for providing more information. I think the Pythoncode below will allow
 you to solve this problem without making use of the modulocounter:

What the code does is count how many features have entered already and when this count exceeds a certain number (5 in my case), it will export a feature containing the json.

import fme
import fmeobjects

class FeatureProcessor(object):
    
    # Initialise some variables before the first feature enters
    def __init__(self):
        self.nbFeaturesProcessed = 0
        self.json = "{"
        
    # This code is executed everytime a feature enters the PythonCaller
    def input(self,feature):
        
        self.json += "'CREATION_INSTANCE':{},".format(feature.getAttribute('_creation_instance'))
        
        self.nbFeaturesProcessed += 1
        
        # Check if already 5 features are processed
        # P.S. You can make a parameter from the number 10 0000 ;)
        if self.nbFeaturesProcessed >= 5:
            self.export()
        
    # This method is called after the last feature has entered
    def close(self):
        
        # If some features where added but the amount is < 5,
        # we should make sure those are exported also
        if self.nbFeaturesProcessed > 0:
            self.export()
        
        
    # Method that is called to export the created json
    def export(self):
        
        # Finalise the json-file by removing the last comma and adding the 
        # curly bracket at the end
        json = "{}{}".format(self.jsonÂ:-1], '}')
        
        # Create a new feature having only one attribute containing the json
        newFeature = fmeobjects.FMEFeature()
        newFeature.setAttribute('json', json)
        self.pyoutput(newFeature)
        
        # reset the variables to start over again
        self.nbFeaturesProcessed = 0
        self.json = "{"

I just create 10 features in the creator and as you can see, those are combined into 2 groups containing 5 features each.

0684Q00000ArKX8QAN.png

If you still have questions. Please ask!

 

This is exactly what I was after! I will let you know how I go. 

 

 

Thanks jeroenstiers

 

 


Hi @ashertbrooks

 

Thanks
 for providing more information. I think the Pythoncode below will allow
 you to solve this problem without making use of the modulocounter:

What the code does is count how many features have entered already and when this count exceeds a certain number (5 in my case), it will export a feature containing the json.

import fme
import fmeobjects

class FeatureProcessor(object):
    
    # Initialise some variables before the first feature enters
    def __init__(self):
        self.nbFeaturesProcessed = 0
        self.json = "{"
        
    # This code is executed everytime a feature enters the PythonCaller
    def input(self,feature):
        
        self.json += "'CREATION_INSTANCE':{},".format(feature.getAttribute('_creation_instance'))
        
        self.nbFeaturesProcessed += 1
        
        # Check if already 5 features are processed
        # P.S. You can make a parameter from the number 10 0000 ;)
        if self.nbFeaturesProcessed >= 5:
            self.export()
        
    # This method is called after the last feature has entered
    def close(self):
        
        # If some features where added but the amount is < 5,
        # we should make sure those are exported also
        if self.nbFeaturesProcessed > 0:
            self.export()
        
        
    # Method that is called to export the created json
    def export(self):
        
        # Finalise the json-file by removing the last comma and adding the 
        # curly bracket at the end
        json = "{}{}".format(self.jsonÂ:-1], '}')
        
        # Create a new feature having only one attribute containing the json
        newFeature = fmeobjects.FMEFeature()
        newFeature.setAttribute('json', json)
        self.pyoutput(newFeature)
        
        # reset the variables to start over again
        self.nbFeaturesProcessed = 0
        self.json = "{"

I just create 10 features in the creator and as you can see, those are combined into 2 groups containing 5 features each.

0684Q00000ArKX8QAN.png

If you still have questions. Please ask!

 

Worked a treat @jeroenstiers I just had to tweak it a bit so that each record or data was a separate array in the json.

 

 

Thanks again for the help

 


Hi @ashertbrooks

 

Thanks
 for providing more information. I think the Pythoncode below will allow
 you to solve this problem without making use of the modulocounter:

What the code does is count how many features have entered already and when this count exceeds a certain number (5 in my case), it will export a feature containing the json.

import fme
import fmeobjects

class FeatureProcessor(object):
    
    # Initialise some variables before the first feature enters
    def __init__(self):
        self.nbFeaturesProcessed = 0
        self.json = "{"
        
    # This code is executed everytime a feature enters the PythonCaller
    def input(self,feature):
        
        self.json += "'CREATION_INSTANCE':{},".format(feature.getAttribute('_creation_instance'))
        
        self.nbFeaturesProcessed += 1
        
        # Check if already 5 features are processed
        # P.S. You can make a parameter from the number 10 0000 ;)
        if self.nbFeaturesProcessed >= 5:
            self.export()
        
    # This method is called after the last feature has entered
    def close(self):
        
        # If some features where added but the amount is < 5,
        # we should make sure those are exported also
        if self.nbFeaturesProcessed > 0:
            self.export()
        
        
    # Method that is called to export the created json
    def export(self):
        
        # Finalise the json-file by removing the last comma and adding the 
        # curly bracket at the end
        json = "{}{}".format(self.jsonÂ:-1], '}')
        
        # Create a new feature having only one attribute containing the json
        newFeature = fmeobjects.FMEFeature()
        newFeature.setAttribute('json', json)
        self.pyoutput(newFeature)
        
        # reset the variables to start over again
        self.nbFeaturesProcessed = 0
        self.json = "{"

I just create 10 features in the creator and as you can see, those are combined into 2 groups containing 5 features each.

0684Q00000ArKX8QAN.png

If you still have questions. Please ask!

 

 

I am glad I could help!

 


Hi @ashertbrooks, if you want to make groups for every N features in the input order, you can add group ID attribute which can be computed with this expression to each feature.

@int(@Count() / N)

Here, N indicates the number of features per group. You can also use the Counter transformer instead of the @Count function.

Then, this workflow should work as well as the Python script provided by @jeroenstiers.

Hope this helps.


Hi @ashertbrooks, if you want to make groups for every N features in the input order, you can add group ID attribute which can be computed with this expression to each feature.

@int(@Count() / N)

Here, N indicates the number of features per group. You can also use the Counter transformer instead of the @Count function.

Then, this workflow should work as well as the Python script provided by @jeroenstiers.

Hope this helps.

Nice FME-solution!

 

 


Hi @ashertbrooks, if you want to make groups for every N features in the input order, you can add group ID attribute which can be computed with this expression to each feature.

@int(@Count() / N)

Here, N indicates the number of features per group. You can also use the Counter transformer instead of the @Count function.

Then, this workflow should work as well as the Python script provided by @jeroenstiers.

Hope this helps.

Thanks @takashi

 


Reply