Skip to main content
Solved

TweetStreamer and CSV file flushing


Forum|alt.badge.img

I've created a simple workspace that streams tweets, does a point in polygon and and writes results to a CSV file. Now the Tweetstreamer is a perpetual streamer - i.e. it requires manual intervention to stop.

The problem is that the CSV writer appears to flush all the writes only after correct completion of the workspace. If the user manual stops the workflow (as required by the Tweetstreamer) the CSV writes are not flushed and data is lost. Is there any way to automatically flush the CSV writing after each record (or another workaround?)

Best answer by david_r

I'm not sure that's possible as FME does not seem to flush the file buffer for each line it writes (which is a good thing, performance-wise).

You can accomplish what you need with Python, however, e.g. by using the following code in a PythonCaller:

import fmeobjects

class FeatureProcessor(object):
    def __init__(self):
        self.filehandle = open(FME_MacroValues['OUTPUT_FILE'], 'a')
        
    def input(self,feature):
        text_line_data = feature.getAttribute('text_line_data')
        if text_line_data:
            self.filehandle.write(text_line_data + '\n')
            self.filehandle.flush()
        self.pyoutput(feature)
        
    def close(self):
        self.filehandle.close()

You will have to assemble your CSV line yourself, however, and put the result into the attribute "text_line_data" before the PythonCaller.

You can change the 'a' to 'w' on line 5 if you want to reset the file every time the workspace restarts, otherwise it will append to an existing file.

You will also need to create a published parameter OUTPUT_FILE that will contain the full path and name of your output file.

If you want the FME translation log to contain the number of lines written (statistics), you can connect a NULL writer to the output port of the PythonCaler.

View original
Did this help you find an answer to your question?

10 replies

itay
Supporter
Forum|alt.badge.img+17
  • Supporter
  • September 8, 2016

Hi,

Is it a requirement to write to the same csv? if not you could use a time stamp and write to a new csv each time.


Forum|alt.badge.img
  • Author
  • September 8, 2016
itay wrote:

Hi,

Is it a requirement to write to the same csv? if not you could use a time stamp and write to a new csv each time.

Thanks for your response, though not sure I quite understand thoroughly:

 

I tried using the Tweet creation date as the CSV filename, thus essentially creating a new file per tweet. this has the effect of:

 

1) creating a new file per tweet - thus potentially creating thousands if not millions of datafiles after a substantial data harvest. (Not really ideal)

 

2) Flushing still appears to be a problem. After running for a few minutes and stopping the workspace, I'm left with a series of empty files.

 

 


david_r
Celebrity
  • Best Answer
  • September 8, 2016

I'm not sure that's possible as FME does not seem to flush the file buffer for each line it writes (which is a good thing, performance-wise).

You can accomplish what you need with Python, however, e.g. by using the following code in a PythonCaller:

import fmeobjects

class FeatureProcessor(object):
    def __init__(self):
        self.filehandle = open(FME_MacroValues['OUTPUT_FILE'], 'a')
        
    def input(self,feature):
        text_line_data = feature.getAttribute('text_line_data')
        if text_line_data:
            self.filehandle.write(text_line_data + '\n')
            self.filehandle.flush()
        self.pyoutput(feature)
        
    def close(self):
        self.filehandle.close()

You will have to assemble your CSV line yourself, however, and put the result into the attribute "text_line_data" before the PythonCaller.

You can change the 'a' to 'w' on line 5 if you want to reset the file every time the workspace restarts, otherwise it will append to an existing file.

You will also need to create a published parameter OUTPUT_FILE that will contain the full path and name of your output file.

If you want the FME translation log to contain the number of lines written (statistics), you can connect a NULL writer to the output port of the PythonCaler.


Forum|alt.badge.img
  • Author
  • September 8, 2016
david_r wrote:

I'm not sure that's possible as FME does not seem to flush the file buffer for each line it writes (which is a good thing, performance-wise).

You can accomplish what you need with Python, however, e.g. by using the following code in a PythonCaller:

import fmeobjects

class FeatureProcessor(object):
    def __init__(self):
        self.filehandle = open(FME_MacroValues['OUTPUT_FILE'], 'a')
        
    def input(self,feature):
        text_line_data = feature.getAttribute('text_line_data')
        if text_line_data:
            self.filehandle.write(text_line_data + '\n')
            self.filehandle.flush()
        self.pyoutput(feature)
        
    def close(self):
        self.filehandle.close()

You will have to assemble your CSV line yourself, however, and put the result into the attribute "text_line_data" before the PythonCaller.

You can change the 'a' to 'w' on line 5 if you want to reset the file every time the workspace restarts, otherwise it will append to an existing file.

You will also need to create a published parameter OUTPUT_FILE that will contain the full path and name of your output file.

If you want the FME translation log to contain the number of lines written (statistics), you can connect a NULL writer to the output port of the PythonCaler.

Thanks David, that workaround makes sense.  

 

From a wider viewpoint perhaps FME should auto-flush CSV files if the workspace is stopped manually.

 

 


david_r
Celebrity
  • September 8, 2016
turner537 wrote:
Thanks David, that workaround makes sense.

 

From a wider viewpoint perhaps FME should auto-flush CSV files if the workspace is stopped manually.

 

 

I agree, it could've been an option. Feel free to make a submission on the "Ideas" board!

itay
Supporter
Forum|alt.badge.img+17
  • Supporter
  • September 8, 2016
turner537 wrote:
Thanks for your response, though not sure I quite understand thoroughly:

 

I tried using the Tweet creation date as the CSV filename, thus essentially creating a new file per tweet. this has the effect of:

 

1) creating a new file per tweet - thus potentially creating thousands if not millions of datafiles after a substantial data harvest. (Not really ideal)

 

2) Flushing still appears to be a problem. After running for a few minutes and stopping the workspace, I'm left with a series of empty files.

 

 

No problem, I guess I didnt understand you either....

 

 


Forum|alt.badge.img
  • Author
  • September 8, 2016
david_r wrote:
I agree, it could've been an option. Feel free to make a submission on the "Ideas" board!
idea added

 

 


mark2atsafe
Safer
Forum|alt.badge.img+44
  • Safer
  • September 8, 2016

I guess the other solution would be to write to a format that lets you commit each record individually - maybe SQLite? - then run a post-translation workspace to convert that format to CSV.

Still only a workaround for a problem I guess we didn't anticipate!

I'm not sure we would auto-flush those formats. Seems to me that in most cases if I stop a workspace it's because I *don't* want to write any data. Or at least, if we did add this it would be optional.


mark2atsafe
Safer
Forum|alt.badge.img+44
  • Safer
  • September 8, 2016

The other, other solution would be to set up a conditional transformer that passes a feature to a Terminator when you don't want the workspace to run any more.

I don't know the scenario and why you press the stop button when you do, but if that condition can be encoded in a Tester, and sent to a Terminator if it passes, then that does seem to write the CSV contents up to that point. I just tested it by creating 100 features, sending them to a CSV writer, but the 51st feature hits a Terminator. I can see the 50 features up to that point are written to the CSV.

So, for example, you could count the tweets coming out of the TweetStreamer and stop after 100 tweets.

Or you could set up an external trigger - for example the existence of a file or read the contents of a simple text file that are either Y or N - and stop on that basis. Then you terminate the workspace by editing the file contents to "Y". Not ideal, but it would work.

Can you check to see if the Terminator causes the file to flush correctly in your scenario?


david_r
Celebrity
  • September 8, 2016

Mark, I think the issue is that you cannot really use the CSV or the Text File writer to write something like a running log, because the file contents aren't flushed unless the workspace terminates successfully. So if you have a Server workspace that does stuff with e.g. websockets where you typically cancel the job rather than let it finish, you risk that your text file is empty or incomplete.

It would be really cool if there was an option on these writers to set how often to flush the buffer, something like a "transaction size". Or maybe introduce a separate "Log Text File" writer that automatically flushes after each feature.


Cookie policy

We use cookies to enhance and personalize your experience. If you accept you agree to our full cookie policy. Learn more about our cookies.

 
Cookie settings