Write attributes into the content

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Write attributes into the content

tzhu
Hi,

I want to know if I can access both attributes and contents and write them
in a new flowfile.

I have some log files, which are in folders labelled by client names. My
task is to do some process on the log file contents and write the result to
a SQL table. I also need to include the client name as one of the column in
SQL table, and the information is only accessible from the file path. To
process the contents, I know I can use executescript and write a custom
script with streamcallback. But if I use "getAttribute" to access the
filename and write a flowfile, it does not seem to work. Any help or ideas?

Thanks,

Tina



--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Write attributes into the content

Jeff
Hello Tina,

Can you provide more detail about what your script is doing?  Are you
encountering an issue getting the filename for a flowfile, or in setting
the filename attribute on the new flowfile you are creating in your
script?  Could you post a link somewhere to your script?

On Fri, Nov 24, 2017 at 10:39 AM tzhu <[hidden email]> wrote:

> Hi,
>
> I want to know if I can access both attributes and contents and write them
> in a new flowfile.
>
> I have some log files, which are in folders labelled by client names. My
> task is to do some process on the log file contents and write the result to
> a SQL table. I also need to include the client name as one of the column in
> SQL table, and the information is only accessible from the file path. To
> process the contents, I know I can use executescript and write a custom
> script with streamcallback. But if I use "getAttribute" to access the
> filename and write a flowfile, it does not seem to work. Any help or ideas?
>
> Thanks,
>
> Tina
>
>
>
> --
> Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: Write attributes into the content

tzhu
The script to process the content of log file:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
 
class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
        disconnected = text.count("Lost connection to server")
        connected = text.count("Established connection to server")
        newFile = 'INSERT INTO [TEST].[db_datawriter].[Test1] \
       (TOTAL_DISCONNECTIONS,TOTAL_CONNECTIONS)\
        VALUES (%d, %d)' % (disconnected, connected)
        outputStream.write(newFile)


flowFile = session.get()
if(flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())  
session.transfer(flowFile, REL_SUCCESS)



The new script I want to use to write the filename:

from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
 
class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    fname = flowFile.getAttribute('filename')
    newFile = 'INSERT INTO [TEST].[db_datawriter].[Test1] \
    (CLIENT_NAME) VALUES (%d)' % fname
    outputStream.write(newFile)

flowFile = session.get()
if(flowFile != None):
    flowFile = session.write(flowFile, PyStreamCallback())
session.transfer(flowFile, REL_SUCCESS)



The error message in the second script shows:
<http://apache-nifi-developer-list.39713.n7.nabble.com/file/t792/error.jpg>

Any idea or help is appreciated!



--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Write attributes into the content

Jeff
TIna,

I'm not the greatest Python/Jython developer, nor have I used ExecuteScript
much, but there's a utility [1] that Matt Burgess created to test scripts
that you'd run in ExecuteScript.  Have you had a chance to try running your
script with the utility to debug it?

One thing I noticed is that the last line of your script
(session.transfer(flowFile, REL_SUCCESS)) is not indented, so it can still
be executed if flowFile is null.  A processor can be triggered even when
there are no flowfiles available for it to process, which means a flowfile
might not be present in the ProcessSession.  I think that's what's causing
your error.  Given your current script, using nifi-scrip-tester, I can
reproduce your issue with the NullPointerException being thrown.  After
indenting that line to be part of the previous if-statement,
nifi-script-tester can finish successfully.

Try cloning Matt's nifi-script-tester, build it with gradle, and create a
script.py file with the contents of your script.  Then try running
nifi-script-tester:
java -jar build/libs/nifi-script-tester-1.2.0-all.jar script.py

You'll see a NullPointerException.

Indent the "session.transfer..." line so that it's part of the
if-statement, and run nifi-script-tester again.  You should see:
Flow Files transferred to success: 0

Next step would be to try feeding some of your data into the script.
Running nifi-script-tester without any parameters will print out usage
information.  You can provide a directory with test data to run through
your script with the "-input" parameter.

[1] https://github.com/mattyb149/nifi-script-tester

On Sat, Nov 25, 2017 at 6:31 PM tzhu <[hidden email]> wrote:

> The script to process the content of log file:
>
> from org.apache.commons.io import IOUtils
> from java.nio.charset import StandardCharsets
> from org.apache.nifi.processor.io import StreamCallback
>
> class PyStreamCallback(StreamCallback):
>   def __init__(self):
>         pass
>   def process(self, inputStream, outputStream):
>         text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
>         disconnected = text.count("Lost connection to server")
>         connected = text.count("Established connection to server")
>         newFile = 'INSERT INTO [TEST].[db_datawriter].[Test1] \
>        (TOTAL_DISCONNECTIONS,TOTAL_CONNECTIONS)\
>         VALUES (%d, %d)' % (disconnected, connected)
>         outputStream.write(newFile)
>
>
> flowFile = session.get()
> if(flowFile != None):
>     flowFile = session.write(flowFile, PyStreamCallback())
> session.transfer(flowFile, REL_SUCCESS)
>
>
>
> The new script I want to use to write the filename:
>
> from org.apache.commons.io import IOUtils
> from java.nio.charset import StandardCharsets
> from org.apache.nifi.processor.io import StreamCallback
>
> class PyStreamCallback(StreamCallback):
>   def __init__(self):
>         pass
>   def process(self, inputStream, outputStream):
>     fname = flowFile.getAttribute('filename')
>     newFile = 'INSERT INTO [TEST].[db_datawriter].[Test1] \
>     (CLIENT_NAME) VALUES (%d)' % fname
>     outputStream.write(newFile)
>
> flowFile = session.get()
> if(flowFile != None):
>     flowFile = session.write(flowFile, PyStreamCallback())
> session.transfer(flowFile, REL_SUCCESS)
>
>
>
> The error message in the second script shows:
> <http://apache-nifi-developer-list.39713.n7.nabble.com/file/t792/error.jpg
> >
>
> Any idea or help is appreciated!
>
>
>
> --
> Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
>