Solved

workspacerunner - how to 'catch' all items created


Badge +1

Hi all,

I ran into the following problem:

i have a workbench which has a workspacerunner, which calls several child workspaces. Each childWS will created a file: <xxx(number).dwg> . The duration for each childWS is different (some take seconds, some 15min). Behind the workspacerunner there is a featureholder followed by a custom transformer which is the combination of a decelerator (timing every 30sec) and a FileExistancechecker in a loop. In the end all dwg's are grabbed and thrown into one big file.

My issue now is that i would like to speed things up. Now i have about 70 child WS so right now i'm waiting about 35min for all things to go through the decelerator. (i wish to increase the amount of child WS, setting the timing to a value less then 30sec is not good because <xxxF(number).dwg> are then missed to be thrown into the big file in the end.

Setting the workspacerunner to wait for job to complete makes me miss the multicore speed. Setting the value to 'no' gives me the ability to fully use the computers power, but once child WS number 70 is activated the workspace continues regardless of those 7 instances which are still busy. I do miss the ability to force the workspacerunner to wait on all jobs to complete here.

Is there a better workaround here?

thanks

Dries

icon

Best answer by takashi 26 January 2016, 02:22

View original

12 replies

Badge +14

Can you perhaps just send all the small jobs to a single WorkspaceRunner and then after that Transformer have your loop to check for the last DWG in the sequence. Once it is found to exist, you need some way to understand if that file has actually been finished writing to, might just be your Decellerator here if you know how long that last job takes. Then a second WorkspaceRunner inline, to do the combination of all the DWGs.

Badge +22

My current workaround is to wrap the workspace runner in a custom transformer. Set the workspace runner to Wait For Job to Complete: Yes, but setup parallel processing on the custom transformer.

 

 

That way you features only leave the workspace runner when the child process is complete, but you can run up to 7 (depending on your license and number of cores) child processes at the same time.

 

 

Note there will be issues if you have more than 500 groups in the parallel process.
Badge +14

My current workaround is to wrap the workspace runner in a custom transformer. Set the workspace runner to Wait For Job to Complete: Yes, but setup parallel processing on the custom transformer.

 

 

That way you features only leave the workspace runner when the child process is complete, but you can run up to 7 (depending on your license and number of cores) child processes at the same time.

 

 

Note there will be issues if you have more than 500 groups in the parallel process.

Nice option. On this basis I've created this idea. https://knowledge.safe.com/content/idea/22570/workspacerunner-identify-when-all-child-processes.html

Badge +22

At the FME user conference, while onstage I asked about a workspace runner option for wait for job to complete Yes with multiple concurrent processes, and Don promised it was in the works.

Userlevel 4
Badge +25

You could - if you knew approximately how long the last set of files were going to run, use the feature from the summary point, decelerate it by that length of time, then feed it into the FeatureHolder. That way you could at least remove the custom transformer/decelerator combo

Userlevel 2
Badge +17

I think @jdh's suggestion is a good solution. As another option, Python scripting might also be possible.

Assume that the initiator features for the WorkspaceRunner contains an attribute storing the destination DWG file path (e.g. "destdataset_acad") and the DWG path will be passed to the child workspace through a published parameter (e.g. "DestDataset_ACAD") with the WorkspaceRunner.

In the child workspace, save the translation status into a text file with Shutdown Python Script.

# Shutdown Python Script example (child workspace).
# Save translation status into a text file.
# Status text file path = destination DWG file path + ".txt", for example.
import fme
with open('%s.txt' % fme.macroValues['DestDataset_ACAD'], 'w') as f:
    f.write('success' if fme.status else 'failed')

In the main workspace, wait for job completion with a PythonCaller, watching the status files creation.

# PythonCaller script example (main workspace): Wait for job completion.
import fmeobjects, os, time
class FeatureProcessor(object):
    def __init__(self):
        # List of destination DWG file paths.
        self.dpaths = []
        
    def input(self,feature):
        # Save a destination DWG file path.
        self.dpaths.append(feature.getAttribute('destdataset_acad'))
        
    def close(self):
        # List of status file paths.
        spaths = ['%s.txt' % path for path in self.dpaths]
        
        # Wait until all status files are created.
        while len([path for path in spaths if os.path.exists(path)]) < len(self.dpaths):
            time.sleep(1.0) # suspend execution. e.g. 1.0 seconds
            
        # Finally create and output a feature.
        # Optionally add attributes describing translation status to the feature.
        feature = fmeobjects.FMEFeature()
        for i, path in enumerate(spaths):
            with open(path) as f:
                feature.setAttribute('result{%d}.status' % i, f.read())
                feature.setAttribute('result{%d}.dwg' % i, self.dpaths[i])
            os.remove(path) # remove the status file.
        self.pyoutput(feature) 
Userlevel 2
Badge +17

I think @jdh's suggestion is a good solution. As another option, Python scripting might also be possible.

Assume that the initiator features for the WorkspaceRunner contains an attribute storing the destination DWG file path (e.g. "destdataset_acad") and the DWG path will be passed to the child workspace through a published parameter (e.g. "DestDataset_ACAD") with the WorkspaceRunner.

In the child workspace, save the translation status into a text file with Shutdown Python Script.

# Shutdown Python Script example (child workspace).
# Save translation status into a text file.
# Status text file path = destination DWG file path + ".txt", for example.
import fme
with open('%s.txt' % fme.macroValues['DestDataset_ACAD'], 'w') as f:
    f.write('success' if fme.status else 'failed')

In the main workspace, wait for job completion with a PythonCaller, watching the status files creation.

# PythonCaller script example (main workspace): Wait for job completion.
import fmeobjects, os, time
class FeatureProcessor(object):
    def __init__(self):
        # List of destination DWG file paths.
        self.dpaths = []
        
    def input(self,feature):
        # Save a destination DWG file path.
        self.dpaths.append(feature.getAttribute('destdataset_acad'))
        
    def close(self):
        # List of status file paths.
        spaths = ['%s.txt' % path for path in self.dpaths]
        
        # Wait until all status files are created.
        while len([path for path in spaths if os.path.exists(path)]) < len(self.dpaths):
            time.sleep(1.0) # suspend execution. e.g. 1.0 seconds
            
        # Finally create and output a feature.
        # Optionally add attributes describing translation status to the feature.
        feature = fmeobjects.FMEFeature()
        for i, path in enumerate(spaths):
            with open(path) as f:
                feature.setAttribute('result{%d}.status' % i, f.read())
                feature.setAttribute('result{%d}.dwg' % i, self.dpaths[i])
            os.remove(path) # remove the status file.
        self.pyoutput(feature) 

If you need to preserve initiator features, this script does that.

# PythonCaller script example 2: Wait for job completion.
import fmeobjects, os, time
class FeatureProcessor2(object):
    def __init__(self):
        # List of input features.
        self.features = []
        
    def input(self, feature):
        # Save the input feature.
        self.features.append(feature)
        
    def close(self):      
        # Create list of status file paths.
        spaths = []
        for feature in self.features:
            spaths.append('%s.txt' % feature.getAttribute('destdataset_acad'))
        
        # Wait until all status files are created.
        while len([path for path in spaths if os.path.exists(path)]) < len(self.features):
            time.sleep(1.0) # suspend execution. e.g. 1.0 seconds
            
        # Finally output features
        for feature, path in zip(self.features, spaths):
            with open(path) as f:
                feature.setAttribute('status', f.read())
            os.remove(path) # remove the status file.
            self.pyoutput(feature) 
Badge +1

My current workaround is to wrap the workspace runner in a custom transformer. Set the workspace runner to Wait For Job to Complete: Yes, but setup parallel processing on the custom transformer.

 

 

That way you features only leave the workspace runner when the child process is complete, but you can run up to 7 (depending on your license and number of cores) child processes at the same time.

 

 

Note there will be issues if you have more than 500 groups in the parallel process.

I would like to go for this option but i can't get a parallel setup around the custom Workspacerunner. In my main workspace there's only a flow 70 entities (one for each childWS to be launched). All the data-computing is done in the childWS.

I wonder if it is even possible in my case. (i've gone through this article https://knowledge.safe.com/articles/1211/parallel-processing.html )

Badge +22

I would like to go for this option but i can't get a parallel setup around the custom Workspacerunner. In my main workspace there's only a flow 70 entities (one for each childWS to be launched). All the data-computing is done in the childWS.

I wonder if it is even possible in my case. (i've gone through this article https://knowledge.safe.com/articles/1211/parallel-processing.html )

The only thing in the custom transformer is a workspaceRunner. Is there only one childWS fmw?

 

 

If so publish the parameters of the child workspace, so that you can set them on the main canvas in the custom transformer as if you were setting them directly in the workspaceRunner.

 

 

If you don't have an attribute to parallel process on, you can use a ModuloCounter.
Badge +1

The only thing in the custom transformer is a workspaceRunner. Is there only one childWS fmw?

 

 

If so publish the parameters of the child workspace, so that you can set them on the main canvas in the custom transformer as if you were setting them directly in the workspaceRunner.

 

 

If you don't have an attribute to parallel process on, you can use a ModuloCounter.

Every childWS gets created in the main workbench (with different parameters ofc) So there are multiple. (all unique)

Parameters are published to the custom transformer (containing the runner).

i did create something similar to the moduloCounter (didn't know of that one)

Badge +22

Every childWS gets created in the main workbench (with different parameters ofc) So there are multiple. (all unique)

Parameters are published to the custom transformer (containing the runner).

i did create something similar to the moduloCounter (didn't know of that one)

Sorry to clarify. Are you calling the same workspace multiple times with different parameters each time? or are you calling multiple different workspaces?

 

 

If the latter, than you need one custom transformer per workspace being called.
Badge +1

I think @jdh's suggestion is a good solution. As another option, Python scripting might also be possible.

Assume that the initiator features for the WorkspaceRunner contains an attribute storing the destination DWG file path (e.g. "destdataset_acad") and the DWG path will be passed to the child workspace through a published parameter (e.g. "DestDataset_ACAD") with the WorkspaceRunner.

In the child workspace, save the translation status into a text file with Shutdown Python Script.

# Shutdown Python Script example (child workspace).
# Save translation status into a text file.
# Status text file path = destination DWG file path + ".txt", for example.
import fme
with open('%s.txt' % fme.macroValues['DestDataset_ACAD'], 'w') as f:
    f.write('success' if fme.status else 'failed')

In the main workspace, wait for job completion with a PythonCaller, watching the status files creation.

# PythonCaller script example (main workspace): Wait for job completion.
import fmeobjects, os, time
class FeatureProcessor(object):
    def __init__(self):
        # List of destination DWG file paths.
        self.dpaths = []
        
    def input(self,feature):
        # Save a destination DWG file path.
        self.dpaths.append(feature.getAttribute('destdataset_acad'))
        
    def close(self):
        # List of status file paths.
        spaths = ['%s.txt' % path for path in self.dpaths]
        
        # Wait until all status files are created.
        while len([path for path in spaths if os.path.exists(path)]) < len(self.dpaths):
            time.sleep(1.0) # suspend execution. e.g. 1.0 seconds
            
        # Finally create and output a feature.
        # Optionally add attributes describing translation status to the feature.
        feature = fmeobjects.FMEFeature()
        for i, path in enumerate(spaths):
            with open(path) as f:
                feature.setAttribute('result{%d}.status' % i, f.read())
                feature.setAttribute('result{%d}.dwg' % i, self.dpaths[i])
            os.remove(path) # remove the status file.
        self.pyoutput(feature) 

as i reached a bit of an dead end with the other options. I've tried this in with couple of childWS, it seems to be working. Thanks!

Reply