Skip to main content

I've created a simple workspace that streams tweets, does a point in polygon and and writes results to a CSV file. Now the Tweetstreamer is a perpetual streamer - i.e. it requires manual intervention to stop.

The problem is that the CSV writer appears to flush all the writes only after correct completion of the workspace. If the user manual stops the workflow (as required by the Tweetstreamer) the CSV writes are not flushed and data is lost. Is there any way to automatically flush the CSV writing after each record (or another workaround?)

Hi,

Is it a requirement to write to the same csv? if not you could use a time stamp and write to a new csv each time.


Hi,

Is it a requirement to write to the same csv? if not you could use a time stamp and write to a new csv each time.

Thanks for your response, though not sure I quite understand thoroughly:

 

I tried using the Tweet creation date as the CSV filename, thus essentially creating a new file per tweet. this has the effect of:

 

1) creating a new file per tweet - thus potentially creating thousands if not millions of datafiles after a substantial data harvest. (Not really ideal)

 

2) Flushing still appears to be a problem. After running for a few minutes and stopping the workspace, I'm left with a series of empty files.

 

 


I'm not sure that's possible as FME does not seem to flush the file buffer for each line it writes (which is a good thing, performance-wise).

You can accomplish what you need with Python, however, e.g. by using the following code in a PythonCaller:

import fmeobjects

class FeatureProcessor(object):
    def __init__(self):
        self.filehandle = open(FME_MacroValues 'OUTPUT_FILE'], 'a')
        
    def input(self,feature):
        text_line_data = feature.getAttribute('text_line_data')
        if text_line_data:
            self.filehandle.write(text_line_data + '\n')
            self.filehandle.flush()
        self.pyoutput(feature)
        
    def close(self):
        self.filehandle.close()

You will have to assemble your CSV line yourself, however, and put the result into the attribute "text_line_data" before the PythonCaller.

You can change the 'a' to 'w' on line 5 if you want to reset the file every time the workspace restarts, otherwise it will append to an existing file.

You will also need to create a published parameter OUTPUT_FILE that will contain the full path and name of your output file.

If you want the FME translation log to contain the number of lines written (statistics), you can connect a NULL writer to the output port of the PythonCaler.


I'm not sure that's possible as FME does not seem to flush the file buffer for each line it writes (which is a good thing, performance-wise).

You can accomplish what you need with Python, however, e.g. by using the following code in a PythonCaller:

import fmeobjects

class FeatureProcessor(object):
    def __init__(self):
        self.filehandle = open(FME_MacroValues 'OUTPUT_FILE'], 'a')
        
    def input(self,feature):
        text_line_data = feature.getAttribute('text_line_data')
        if text_line_data:
            self.filehandle.write(text_line_data + '\n')
            self.filehandle.flush()
        self.pyoutput(feature)
        
    def close(self):
        self.filehandle.close()

You will have to assemble your CSV line yourself, however, and put the result into the attribute "text_line_data" before the PythonCaller.

You can change the 'a' to 'w' on line 5 if you want to reset the file every time the workspace restarts, otherwise it will append to an existing file.

You will also need to create a published parameter OUTPUT_FILE that will contain the full path and name of your output file.

If you want the FME translation log to contain the number of lines written (statistics), you can connect a NULL writer to the output port of the PythonCaler.

Thanks David, that workaround makes sense.  

 

From a wider viewpoint perhaps FME should auto-flush CSV files if the workspace is stopped manually.

 

 


Thanks David, that workaround makes sense.

 

From a wider viewpoint perhaps FME should auto-flush CSV files if the workspace is stopped manually.

 

 

I agree, it could've been an option. Feel free to make a submission on the "Ideas" board!
Thanks for your response, though not sure I quite understand thoroughly:

 

I tried using the Tweet creation date as the CSV filename, thus essentially creating a new file per tweet. this has the effect of:

 

1) creating a new file per tweet - thus potentially creating thousands if not millions of datafiles after a substantial data harvest. (Not really ideal)

 

2) Flushing still appears to be a problem. After running for a few minutes and stopping the workspace, I'm left with a series of empty files.

 

 

No problem, I guess I didnt understand you either....

 

 


I agree, it could've been an option. Feel free to make a submission on the "Ideas" board!
idea added

 

 


I guess the other solution would be to write to a format that lets you commit each record individually - maybe SQLite? - then run a post-translation workspace to convert that format to CSV.

Still only a workaround for a problem I guess we didn't anticipate!

I'm not sure we would auto-flush those formats. Seems to me that in most cases if I stop a workspace it's because I *don't* want to write any data. Or at least, if we did add this it would be optional.


The other, other solution would be to set up a conditional transformer that passes a feature to a Terminator when you don't want the workspace to run any more.

I don't know the scenario and why you press the stop button when you do, but if that condition can be encoded in a Tester, and sent to a Terminator if it passes, then that does seem to write the CSV contents up to that point. I just tested it by creating 100 features, sending them to a CSV writer, but the 51st feature hits a Terminator. I can see the 50 features up to that point are written to the CSV.

So, for example, you could count the tweets coming out of the TweetStreamer and stop after 100 tweets.

Or you could set up an external trigger - for example the existence of a file or read the contents of a simple text file that are either Y or N - and stop on that basis. Then you terminate the workspace by editing the file contents to "Y". Not ideal, but it would work.

Can you check to see if the Terminator causes the file to flush correctly in your scenario?


Mark, I think the issue is that you cannot really use the CSV or the Text File writer to write something like a running log, because the file contents aren't flushed unless the workspace terminates successfully. So if you have a Server workspace that does stuff with e.g. websockets where you typically cancel the job rather than let it finish, you risk that your text file is empty or incomplete.

It would be really cool if there was an option on these writers to set how often to flush the buffer, something like a "transaction size". Or maybe introduce a separate "Log Text File" writer that automatically flushes after each feature.


Reply