Skip to main content
Solved

workspacerunner - how to 'catch' all items created


ddecoene
Contributor
Forum|alt.badge.img+4

Hi all,

I ran into the following problem:

i have a workbench which has a workspacerunner, which calls several child workspaces. Each childWS will created a file: <xxx(number).dwg> . The duration for each childWS is different (some take seconds, some 15min). Behind the workspacerunner there is a featureholder followed by a custom transformer which is the combination of a decelerator (timing every 30sec) and a FileExistancechecker in a loop. In the end all dwg's are grabbed and thrown into one big file.

My issue now is that i would like to speed things up. Now i have about 70 child WS so right now i'm waiting about 35min for all things to go through the decelerator. (i wish to increase the amount of child WS, setting the timing to a value less then 30sec is not good because <xxxF(number).dwg> are then missed to be thrown into the big file in the end.

Setting the workspacerunner to wait for job to complete makes me miss the multicore speed. Setting the value to 'no' gives me the ability to fully use the computers power, but once child WS number 70 is activated the workspace continues regardless of those 7 instances which are still busy. I do miss the ability to force the workspacerunner to wait on all jobs to complete here.

Is there a better workaround here?

thanks

Dries

Best answer by takashi

I think @jdh's suggestion is a good solution. As another option, Python scripting might also be possible.

Assume that the initiator features for the WorkspaceRunner contains an attribute storing the destination DWG file path (e.g. "destdataset_acad") and the DWG path will be passed to the child workspace through a published parameter (e.g. "DestDataset_ACAD") with the WorkspaceRunner.

In the child workspace, save the translation status into a text file with Shutdown Python Script.

# Shutdown Python Script example (child workspace).
# Save translation status into a text file.
# Status text file path = destination DWG file path + ".txt", for example.
import fme
with open('%s.txt' % fme.macroValues['DestDataset_ACAD'], 'w'as f:
    f.write('success' if fme.status else 'failed')

In the main workspace, wait for job completion with a PythonCaller, watching the status files creation.

# PythonCaller script example (main workspace): Wait for job completion.
import fmeobjects, os, time
class FeatureProcessor(object):
    def __init__(self):
        # List of destination DWG file paths.
        self.dpaths = []
        
    def input(self,feature):
        # Save a destination DWG file path.
        self.dpaths.append(feature.getAttribute('destdataset_acad'))
        
    def close(self):
        # List of status file paths.
        spaths = ['%s.txt' % path for path in self.dpaths]
        
        # Wait until all status files are created.
        while len([path for path in spaths if os.path.exists(path)]) < len(self.dpaths):
            time.sleep(1.0# suspend execution. e.g. 1.0 seconds
            
        # Finally create and output a feature.
        # Optionally add attributes describing translation status to the feature.
        feature = fmeobjects.FMEFeature()
        for i, path in enumerate(spaths):
            with open(path) as f:
                feature.setAttribute('result{%d}.status' % i, f.read())
                feature.setAttribute('result{%d}.dwg' % i, self.dpaths[i])
            os.remove(path) # remove the status file.
        self.pyoutput(feature) 
View original
Did this help you find an answer to your question?
This post is closed to further activity.
It may be a question with a best answer, an implemented idea, or just a post needing no comment.
If you have a follow-up or related question, please post a new question or idea.
If there is a genuine update to be made, please contact us and request that the post is reopened.

12 replies

davideagle
Contributor
Forum|alt.badge.img+21
  • Contributor
  • January 25, 2016

Can you perhaps just send all the small jobs to a single WorkspaceRunner and then after that Transformer have your loop to check for the last DWG in the sequence. Once it is found to exist, you need some way to understand if that file has actually been finished writing to, might just be your Decellerator here if you know how long that last job takes. Then a second WorkspaceRunner inline, to do the combination of all the DWGs.


jdh
Contributor
Forum|alt.badge.img+28
  • Contributor
  • January 25, 2016

My current workaround is to wrap the workspace runner in a custom transformer. Set the workspace runner to Wait For Job to Complete: Yes, but setup parallel processing on the custom transformer.

 

 

That way you features only leave the workspace runner when the child process is complete, but you can run up to 7 (depending on your license and number of cores) child processes at the same time.

 

 

Note there will be issues if you have more than 500 groups in the parallel process.

davideagle
Contributor
Forum|alt.badge.img+21
  • Contributor
  • January 25, 2016
jdh wrote:

My current workaround is to wrap the workspace runner in a custom transformer. Set the workspace runner to Wait For Job to Complete: Yes, but setup parallel processing on the custom transformer.

 

 

That way you features only leave the workspace runner when the child process is complete, but you can run up to 7 (depending on your license and number of cores) child processes at the same time.

 

 

Note there will be issues if you have more than 500 groups in the parallel process.

Nice option. On this basis I've created this idea. https://knowledge.safe.com/content/idea/22570/workspacerunner-identify-when-all-child-processes.html


jdh
Contributor
Forum|alt.badge.img+28
  • Contributor
  • January 25, 2016
davideagle wrote:

At the FME user conference, while onstage I asked about a workspace runner option for wait for job to complete Yes with multiple concurrent processes, and Don promised it was in the works.


mark2atsafe
Safer
Forum|alt.badge.img+44
  • Safer
  • January 25, 2016

You could - if you knew approximately how long the last set of files were going to run, use the feature from the summary point, decelerate it by that length of time, then feed it into the FeatureHolder. That way you could at least remove the custom transformer/decelerator combo


takashi
Supporter
  • Best Answer
  • January 26, 2016

I think @jdh's suggestion is a good solution. As another option, Python scripting might also be possible.

Assume that the initiator features for the WorkspaceRunner contains an attribute storing the destination DWG file path (e.g. "destdataset_acad") and the DWG path will be passed to the child workspace through a published parameter (e.g. "DestDataset_ACAD") with the WorkspaceRunner.

In the child workspace, save the translation status into a text file with Shutdown Python Script.

# Shutdown Python Script example (child workspace).
# Save translation status into a text file.
# Status text file path = destination DWG file path + ".txt", for example.
import fme
with open('%s.txt' % fme.macroValues['DestDataset_ACAD'], 'w'as f:
    f.write('success' if fme.status else 'failed')

In the main workspace, wait for job completion with a PythonCaller, watching the status files creation.

# PythonCaller script example (main workspace): Wait for job completion.
import fmeobjects, os, time
class FeatureProcessor(object):
    def __init__(self):
        # List of destination DWG file paths.
        self.dpaths = []
        
    def input(self,feature):
        # Save a destination DWG file path.
        self.dpaths.append(feature.getAttribute('destdataset_acad'))
        
    def close(self):
        # List of status file paths.
        spaths = ['%s.txt' % path for path in self.dpaths]
        
        # Wait until all status files are created.
        while len([path for path in spaths if os.path.exists(path)]) < len(self.dpaths):
            time.sleep(1.0# suspend execution. e.g. 1.0 seconds
            
        # Finally create and output a feature.
        # Optionally add attributes describing translation status to the feature.
        feature = fmeobjects.FMEFeature()
        for i, path in enumerate(spaths):
            with open(path) as f:
                feature.setAttribute('result{%d}.status' % i, f.read())
                feature.setAttribute('result{%d}.dwg' % i, self.dpaths[i])
            os.remove(path) # remove the status file.
        self.pyoutput(feature) 

takashi
Supporter
  • January 26, 2016
takashi wrote:

I think @jdh's suggestion is a good solution. As another option, Python scripting might also be possible.

Assume that the initiator features for the WorkspaceRunner contains an attribute storing the destination DWG file path (e.g. "destdataset_acad") and the DWG path will be passed to the child workspace through a published parameter (e.g. "DestDataset_ACAD") with the WorkspaceRunner.

In the child workspace, save the translation status into a text file with Shutdown Python Script.

# Shutdown Python Script example (child workspace).
# Save translation status into a text file.
# Status text file path = destination DWG file path + ".txt", for example.
import fme
with open('%s.txt' % fme.macroValues['DestDataset_ACAD'], 'w'as f:
    f.write('success' if fme.status else 'failed')

In the main workspace, wait for job completion with a PythonCaller, watching the status files creation.

# PythonCaller script example (main workspace): Wait for job completion.
import fmeobjects, os, time
class FeatureProcessor(object):
    def __init__(self):
        # List of destination DWG file paths.
        self.dpaths = []
        
    def input(self,feature):
        # Save a destination DWG file path.
        self.dpaths.append(feature.getAttribute('destdataset_acad'))
        
    def close(self):
        # List of status file paths.
        spaths = ['%s.txt' % path for path in self.dpaths]
        
        # Wait until all status files are created.
        while len([path for path in spaths if os.path.exists(path)]) < len(self.dpaths):
            time.sleep(1.0# suspend execution. e.g. 1.0 seconds
            
        # Finally create and output a feature.
        # Optionally add attributes describing translation status to the feature.
        feature = fmeobjects.FMEFeature()
        for i, path in enumerate(spaths):
            with open(path) as f:
                feature.setAttribute('result{%d}.status' % i, f.read())
                feature.setAttribute('result{%d}.dwg' % i, self.dpaths[i])
            os.remove(path) # remove the status file.
        self.pyoutput(feature) 

If you need to preserve initiator features, this script does that.

# PythonCaller script example 2: Wait for job completion.
import fmeobjects, os, time
class FeatureProcessor2(object):
    def __init__(self):
        # List of input features.
        self.features = []
        
    def input(self, feature):
        # Save the input feature.
        self.features.append(feature)
        
    def close(self):      
        # Create list of status file paths.
        spaths = []
        for feature in self.features:
            spaths.append('%s.txt' % feature.getAttribute('destdataset_acad'))
        
        # Wait until all status files are created.
        while len([path for path in spaths if os.path.exists(path)]) < len(self.features):
            time.sleep(1.0# suspend execution. e.g. 1.0 seconds
            
        # Finally output features
        for feature, path in zip(self.features, spaths):
            with open(path) as f:
                feature.setAttribute('status', f.read())
            os.remove(path) # remove the status file.
            self.pyoutput(feature) 

ddecoene
Contributor
Forum|alt.badge.img+4
  • Author
  • Contributor
  • January 26, 2016
jdh wrote:

My current workaround is to wrap the workspace runner in a custom transformer. Set the workspace runner to Wait For Job to Complete: Yes, but setup parallel processing on the custom transformer.

 

 

That way you features only leave the workspace runner when the child process is complete, but you can run up to 7 (depending on your license and number of cores) child processes at the same time.

 

 

Note there will be issues if you have more than 500 groups in the parallel process.

I would like to go for this option but i can't get a parallel setup around the custom Workspacerunner. In my main workspace there's only a flow 70 entities (one for each childWS to be launched). All the data-computing is done in the childWS.

I wonder if it is even possible in my case. (i've gone through this article https://knowledge.safe.com/articles/1211/parallel-processing.html )


jdh
Contributor
Forum|alt.badge.img+28
  • Contributor
  • January 26, 2016
ddecoene wrote:

I would like to go for this option but i can't get a parallel setup around the custom Workspacerunner. In my main workspace there's only a flow 70 entities (one for each childWS to be launched). All the data-computing is done in the childWS.

I wonder if it is even possible in my case. (i've gone through this article https://knowledge.safe.com/articles/1211/parallel-processing.html )

The only thing in the custom transformer is a workspaceRunner. Is there only one childWS fmw?

 

 

If so publish the parameters of the child workspace, so that you can set them on the main canvas in the custom transformer as if you were setting them directly in the workspaceRunner.

 

 

If you don't have an attribute to parallel process on, you can use a ModuloCounter.

ddecoene
Contributor
Forum|alt.badge.img+4
  • Author
  • Contributor
  • January 26, 2016
jdh wrote:

The only thing in the custom transformer is a workspaceRunner. Is there only one childWS fmw?

 

 

If so publish the parameters of the child workspace, so that you can set them on the main canvas in the custom transformer as if you were setting them directly in the workspaceRunner.

 

 

If you don't have an attribute to parallel process on, you can use a ModuloCounter.

Every childWS gets created in the main workbench (with different parameters ofc) So there are multiple. (all unique)

Parameters are published to the custom transformer (containing the runner).

i did create something similar to the moduloCounter (didn't know of that one)


jdh
Contributor
Forum|alt.badge.img+28
  • Contributor
  • January 26, 2016
ddecoene wrote:

Every childWS gets created in the main workbench (with different parameters ofc) So there are multiple. (all unique)

Parameters are published to the custom transformer (containing the runner).

i did create something similar to the moduloCounter (didn't know of that one)

Sorry to clarify. Are you calling the same workspace multiple times with different parameters each time? or are you calling multiple different workspaces?

 

 

If the latter, than you need one custom transformer per workspace being called.

ddecoene
Contributor
Forum|alt.badge.img+4
  • Author
  • Contributor
  • January 27, 2016
takashi wrote:

I think @jdh's suggestion is a good solution. As another option, Python scripting might also be possible.

Assume that the initiator features for the WorkspaceRunner contains an attribute storing the destination DWG file path (e.g. "destdataset_acad") and the DWG path will be passed to the child workspace through a published parameter (e.g. "DestDataset_ACAD") with the WorkspaceRunner.

In the child workspace, save the translation status into a text file with Shutdown Python Script.

# Shutdown Python Script example (child workspace).
# Save translation status into a text file.
# Status text file path = destination DWG file path + ".txt", for example.
import fme
with open('%s.txt' % fme.macroValues['DestDataset_ACAD'], 'w'as f:
    f.write('success' if fme.status else 'failed')

In the main workspace, wait for job completion with a PythonCaller, watching the status files creation.

# PythonCaller script example (main workspace): Wait for job completion.
import fmeobjects, os, time
class FeatureProcessor(object):
    def __init__(self):
        # List of destination DWG file paths.
        self.dpaths = []
        
    def input(self,feature):
        # Save a destination DWG file path.
        self.dpaths.append(feature.getAttribute('destdataset_acad'))
        
    def close(self):
        # List of status file paths.
        spaths = ['%s.txt' % path for path in self.dpaths]
        
        # Wait until all status files are created.
        while len([path for path in spaths if os.path.exists(path)]) < len(self.dpaths):
            time.sleep(1.0# suspend execution. e.g. 1.0 seconds
            
        # Finally create and output a feature.
        # Optionally add attributes describing translation status to the feature.
        feature = fmeobjects.FMEFeature()
        for i, path in enumerate(spaths):
            with open(path) as f:
                feature.setAttribute('result{%d}.status' % i, f.read())
                feature.setAttribute('result{%d}.dwg' % i, self.dpaths[i])
            os.remove(path) # remove the status file.
        self.pyoutput(feature) 

as i reached a bit of an dead end with the other options. I've tried this in with couple of childWS, it seems to be working. Thanks!


Cookie policy

We use cookies to enhance and personalize your experience. If you accept you agree to our full cookie policy. Learn more about our cookies.

 
Cookie settings