Issue with Executescript

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Issue with Executescript

Vyshali
Hi,

I'm using the executescript process to generate some fake data using "Faker"
package and replacing it in the original data.I have attached the script for
your reference.

import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import unicodecsv as csv
from faker import Factory
from collections import defaultdict

class TransformCallback(StreamCallback):
    def _init_(self):
        pass

    def process(self,inputStream,outputStream):
        text = IOUtils.toString(inputStream,StandardCharsets.ISO_8859_1)
        faker  = Factory.create()            //generating fake data
        names  = defaultdict(faker.name)
        emails = defaultdict(faker.email)
        ssns = defaultdict(faker.ssn)
        phone_numbers = defaultdict(faker.phone_number)

        for row in text.splitlines():  
            row["name"]  = names[row["name"]]     //Assigning the fake data
            row["email"] = emails[row["email"]]
            row["ssn"] = ssns[row["ssn"]]
            row["phone_number"] = phone_numbers[row["phone_number"]]
            flowFile = session.putAttribute(flowFile,"name",row["name"])

        outputStream.write(text.encode('UTF8'))


flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile,TransformCallback())
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()

But I'm unable to execute it successfully.I'm getting the following error
"ProcessException:TypeError:None required"

I'm not much familiar to python.Please give me suggestions on how can I
solve this.Correct me in case my coding is also not appropriate.

Regards,
Vyshali




--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Executescript

Andy Christianson-2
Vyshali,

These types of issues can be difficult to diagnose in a running NiFi instance. Once your script reaches beyond a certain threshold, I recommend creating unit tests for your script and mocking the NiFi interfaces. MattyB's script tester [1] may help you with this task as well as general script testing.

My initial eyeball of your code tells me there's something up with your putattribute.

On a final note, if you're not familiar with Python, are there any other languages you are more familiar with? ExecuteScript allows you to use almost any language you could think of, so if you are more comfortable with another supported language, I recommend porting to that. Between using a language your'e more comfortable with, and using MattyB's script tester, I'm confident we should be able to resolve the issue.

Regards,

Andy

P.S. I would consider this more of a user question than a developer question, so next time please email [hidden email] rather than the dev list.

[1]: http://funnifi.blogspot.com/2017/10/release-120-of-nifi-script-tester.html?m=1
________________________________________
From: Vyshali <[hidden email]>
Sent: Thursday, November 2, 2017 1:31 PM
To: [hidden email]
Subject: Issue with Executescript

Hi,

I'm using the executescript process to generate some fake data using "Faker"
package and replacing it in the original data.I have attached the script for
your reference.

import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import unicodecsv as csv
from faker import Factory
from collections import defaultdict

class TransformCallback(StreamCallback):
    def _init_(self):
        pass

    def process(self,inputStream,outputStream):
        text = IOUtils.toString(inputStream,StandardCharsets.ISO_8859_1)
        faker  = Factory.create()            //generating fake data
        names  = defaultdict(faker.name)
        emails = defaultdict(faker.email)
        ssns = defaultdict(faker.ssn)
        phone_numbers = defaultdict(faker.phone_number)

        for row in text.splitlines():
            row["name"]  = names[row["name"]]     //Assigning the fake data
            row["email"] = emails[row["email"]]
            row["ssn"] = ssns[row["ssn"]]
            row["phone_number"] = phone_numbers[row["phone_number"]]
            flowFile = session.putAttribute(flowFile,"name",row["name"])

        outputStream.write(text.encode('UTF8'))


flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile,TransformCallback())
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()

But I'm unable to execute it successfully.I'm getting the following error
"ProcessException:TypeError:None required"

I'm not much familiar to python.Please give me suggestions on how can I
solve this.Correct me in case my coding is also not appropriate.

Regards,
Vyshali




--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/



Reply | Threaded
Open this post in threaded view
|

Re: Issue with Executescript

Andy Christianson-2
Vyshali,

Please also consider looking at your splitlines call. Splitlines is defined in python's script class, but not in Java. This is Jython, so IOUtils.toString returns a Java string, not a Python string, and that string splits in a different way.

Regards,

Andy
________________________________________
From: Andy Christianson <[hidden email]>
Sent: Thursday, November 2, 2017 1:40 PM
To: [hidden email]
Subject: Re: Issue with Executescript

Vyshali,

These types of issues can be difficult to diagnose in a running NiFi instance. Once your script reaches beyond a certain threshold, I recommend creating unit tests for your script and mocking the NiFi interfaces. MattyB's script tester [1] may help you with this task as well as general script testing.

My initial eyeball of your code tells me there's something up with your putattribute.

On a final note, if you're not familiar with Python, are there any other languages you are more familiar with? ExecuteScript allows you to use almost any language you could think of, so if you are more comfortable with another supported language, I recommend porting to that. Between using a language your'e more comfortable with, and using MattyB's script tester, I'm confident we should be able to resolve the issue.

Regards,

Andy

P.S. I would consider this more of a user question than a developer question, so next time please email [hidden email] rather than the dev list.

[1]: http://funnifi.blogspot.com/2017/10/release-120-of-nifi-script-tester.html?m=1
________________________________________
From: Vyshali <[hidden email]>
Sent: Thursday, November 2, 2017 1:31 PM
To: [hidden email]
Subject: Issue with Executescript

Hi,

I'm using the executescript process to generate some fake data using "Faker"
package and replacing it in the original data.I have attached the script for
your reference.

import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import unicodecsv as csv
from faker import Factory
from collections import defaultdict

class TransformCallback(StreamCallback):
    def _init_(self):
        pass

    def process(self,inputStream,outputStream):
        text = IOUtils.toString(inputStream,StandardCharsets.ISO_8859_1)
        faker  = Factory.create()            //generating fake data
        names  = defaultdict(faker.name)
        emails = defaultdict(faker.email)
        ssns = defaultdict(faker.ssn)
        phone_numbers = defaultdict(faker.phone_number)

        for row in text.splitlines():
            row["name"]  = names[row["name"]]     //Assigning the fake data
            row["email"] = emails[row["email"]]
            row["ssn"] = ssns[row["ssn"]]
            row["phone_number"] = phone_numbers[row["phone_number"]]
            flowFile = session.putAttribute(flowFile,"name",row["name"])

        outputStream.write(text.encode('UTF8'))


flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile,TransformCallback())
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()

But I'm unable to execute it successfully.I'm getting the following error
"ProcessException:TypeError:None required"

I'm not much familiar to python.Please give me suggestions on how can I
solve this.Correct me in case my coding is also not appropriate.

Regards,
Vyshali




--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/





Reply | Threaded
Open this post in threaded view
|

Re: Issue with Executescript

Vyshali
Thank you Andy.

How can I convert the "text" into a list or array? So that I could get rid
of splitlines funtion itself

Regards,
Vyshali



--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Executescript

Andy Christianson-2
Hi Vyshali,

Follow the usual Java String API [1]. Instead of splitlines, use .split(String regex).

-Andy

[1]: https://docs.oracle.com/javase/6/docs/api/java/lang/String.html
________________________________________
From: Vyshali <[hidden email]>
Sent: Thursday, November 2, 2017 2:14 PM
To: [hidden email]
Subject: Re: Issue with Executescript

Thank you Andy.

How can I convert the "text" into a list or array? So that I could get rid
of splitlines funtion itself

Regards,
Vyshali



--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/



Reply | Threaded
Open this post in threaded view
|

Re: Issue with Executescript

Vyshali
In reply to this post by Andy Christianson-2
Hi,

I replaced splitlines with split(). I'm now getting error like "Unicode
indices must be integer".The "text" is now in the unicode format which I'm
encoding to utf-8. I'm not sure where I'm lacking clarity.

Regards,
Vyshali



--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Executescript

Matt Burgess-2
Vyshali,

I saw you imported unicodecsv but I didn't see where/if you used it.
If you don't use csv.reader() or something, you won't have a data
structure where you can reference fields by name (i.e. how would it
know where "name" is in a line of text?). I'm not very familiar with
the Python csv stuff so I can't comment on whether your script has the
correct usage of that, in terms of replacing the fields inline.

Regards,
Matt

On Fri, Nov 3, 2017 at 2:35 AM, Vyshali <[hidden email]> wrote:

> Hi,
>
> I replaced splitlines with split(). I'm now getting error like "Unicode
> indices must be integer".The "text" is now in the unicode format which I'm
> encoding to utf-8. I'm not sure where I'm lacking clarity.
>
> Regards,
> Vyshali
>
>
>
> --
> Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Executescript

Vyshali
In reply to this post by Andy Christianson-2
Hi,

I have modified the code by understanding the dictReader
funtionality.However I face some issues.
I have added my code here.Using "GetFile" processor a csv file is read with
fields name,phone_number,email and ssn, which is then sent as input to
executescript.I'm using dictReader to read the data and modify the fields
using Faker package..But I'm unable to modify.I'm getting empty json at the
end.Please give me some suggestions on how to solve this problem.

import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import unicodecsv as csv
from faker import Factory
from collections import defaultdict
import json
import csv
import io

class TransformCallback(StreamCallback):
    def _init_(self):
        pass

    def process(self,inputStream,outputStream):
        inputdata =
IOUtils.toString(inputStream,StandardCharsets.ISO_8859_1)
        text = csv.DictReader(io.StringIO(inputdata))
        faker  = Factory.create()
        names  = defaultdict(faker.name)
        emails = defaultdict(faker.email)
        ssns = defaultdict(faker.ssn)
        phone_numbers = defaultdict(faker.phone_number)

        for row in text:
             row["name"] = names[row["name"]]
             row["email"] = emails[row["email"]]
             row["ssn"] = ssns[row["ssn"]]
             row["phone_number"] = phone_numbers[row["phone_number"]]
        textdata = list(text)
        values_str = json.dumps(textdata)
        outputStream.write(values_str.encode('utf-8'))

flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile,TransformCallback())
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()

References of the code:
http://go.databricks.com/hubfs/notebooks/blogs/Healthcare%20PII%20anonymization/Healthcare%20PII%20anonymization%20example.html
<http://go.databricks.com/hubfs/notebooks/blogs/Healthcare%20PII%20anonymization/Healthcare%20PII%20anonymization%20example.html>  
https://stackoverflow.com/questions/31658115/python-csv-dictreader-parse-string
<https://stackoverflow.com/questions/31658115/python-csv-dictreader-parse-string>  
https://stackoverflow.com/questions/19664145/how-to-convert-list-of-nested-dictionaries-into-string-and-vice-versa
<https://stackoverflow.com/questions/19664145/how-to-convert-list-of-nested-dictionaries-into-string-and-vice-versa>  


Thanks,
Vyshali





--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Executescript

Matt Burgess-2
Vyshali,

This would make an excellent example of a ScriptedRecordSetWriter [1].
You can use ConvertRecord with a CSVReader to read in the records
(which can be accessed by field name), and replace each with a fake
instance of the same using your ScriptedRecordSetWriter
implementation. I see you're using defaultdict() to initialize fake
instances on-the-fly, to replace the same field values in all records
with the same fake instance, thereby getting some anonymization while
retaining the referential integrity. Very cool!

You can set up the defaultdict() stuff in the constructor to the
ScriptedRecordSetWriter, and can get access to the fields/values from
the incoming Record, then write out the fake lookups of them. Check
out Drew Lim's awesome article on how to use ScriptedRecordSetWriter
[2], he uses XML as the output format but you just need to write out
CSV or JSON or whatever, using the fake versions of the fields'
values.  Please let me know if you try this and run into any trouble,
we'd be happy to help get you going.

If you'd rather stick with ExecuteScript, try adding some logging in
there (using the provided "log" object, you can do things like
log.info("my text = " + text) and such) to see if you are getting the
lookups correctly, and getting the conversion to JSON correctly.

Regards,
Matt

[1] https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-scripting-nar/1.4.0/org.apache.nifi.record.script.ScriptedRecordSetWriter/index.html
[2] https://community.hortonworks.com/articles/115311/convert-csv-to-json-avro-xml-using-convertrecord-p.html

On Sun, Nov 5, 2017 at 1:52 PM, Vyshali <[hidden email]> wrote:

> Hi,
>
> I have modified the code by understanding the dictReader
> funtionality.However I face some issues.
> I have added my code here.Using "GetFile" processor a csv file is read with
> fields name,phone_number,email and ssn, which is then sent as input to
> executescript.I'm using dictReader to read the data and modify the fields
> using Faker package..But I'm unable to modify.I'm getting empty json at the
> end.Please give me some suggestions on how to solve this problem.
>
> import java.io
> from org.apache.commons.io import IOUtils
> from java.nio.charset import StandardCharsets
> from org.apache.nifi.processor.io import StreamCallback
> import unicodecsv as csv
> from faker import Factory
> from collections import defaultdict
> import json
> import csv
> import io
>
> class TransformCallback(StreamCallback):
>     def _init_(self):
>         pass
>
>     def process(self,inputStream,outputStream):
>         inputdata =
> IOUtils.toString(inputStream,StandardCharsets.ISO_8859_1)
>         text = csv.DictReader(io.StringIO(inputdata))
>         faker  = Factory.create()
>         names  = defaultdict(faker.name)
>         emails = defaultdict(faker.email)
>         ssns = defaultdict(faker.ssn)
>         phone_numbers = defaultdict(faker.phone_number)
>
>         for row in text:
>              row["name"] = names[row["name"]]
>              row["email"] = emails[row["email"]]
>              row["ssn"] = ssns[row["ssn"]]
>              row["phone_number"] = phone_numbers[row["phone_number"]]
>         textdata = list(text)
>         values_str = json.dumps(textdata)
>         outputStream.write(values_str.encode('utf-8'))
>
> flowFile = session.get()
> if flowFile != None:
>     flowFile = session.write(flowFile,TransformCallback())
>     session.transfer(flowFile, REL_SUCCESS)
>     session.commit()
>
> References of the code:
> http://go.databricks.com/hubfs/notebooks/blogs/Healthcare%20PII%20anonymization/Healthcare%20PII%20anonymization%20example.html
> <http://go.databricks.com/hubfs/notebooks/blogs/Healthcare%20PII%20anonymization/Healthcare%20PII%20anonymization%20example.html>
> https://stackoverflow.com/questions/31658115/python-csv-dictreader-parse-string
> <https://stackoverflow.com/questions/31658115/python-csv-dictreader-parse-string>
> https://stackoverflow.com/questions/19664145/how-to-convert-list-of-nested-dictionaries-into-string-and-vice-versa
> <https://stackoverflow.com/questions/19664145/how-to-convert-list-of-nested-dictionaries-into-string-and-vice-versa>
>
>
> Thanks,
> Vyshali
>
>
>
>
>
> --
> Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Executescript

Vyshali
Matt,

Thank you so much for your suggestion.But I would like to go with
executescript since I'm almost done with the code.I will try the processors
which you told in the future

I'm still stuck with some problem in my code.I have added it here

import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import unicodecsv as csv
from faker import Factory
from collections import defaultdict
import json
import csv
import io

class TransformCallback(StreamCallback):
    def _init_(self):
        pass

    def process(self,inputStream,outputStream):
        inputdata =
IOUtils.toString(inputStream,StandardCharsets.ISO_8859_1)
        text = csv.DictReader(io.StringIO(inputdata))
        textdata = list(text)
        length = len(textdata)
        outputstr = '['
        i = 1
        faker  = Factory.create()
        names  = defaultdict(faker.name)
        emails = defaultdict(faker.email)
        ssns = defaultdict(faker.ssn)
        phone_numbers = defaultdict(faker.phone_number)
        output = defaultdict(list)
        for row in text:
                for k,v in row.items():
                        if k == "name":
                                output['name'] = names[v]
                        elif k == "email":
                                output['email'] = emails[v]
                        elif k == "ssn":
                                output['ssn'] = ssns[v]
                        elif k == "phone_number":
                                output['phone_number'] = phone_numbers[v]
                        else:
                                output[k] = v
                outputstr += json.dumps(output)
                if i == length:
                outputstr += json.dumps(output)
                if i == length:
                        outputstr = outputstr+']'
                else:
                        outputstr += ','
                        i = i+1
        outputstr = json.dumps(output)
        outputStream.write(outputstr.encode('utf-8'))

flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile,TransformCallback())
    session.transfer(flowFile, REL_SUCCESS)
    session.commit()


I'm append each "output" instance of dictionary to "outputstr" so that I
could write it to the flowfile.I'm setting up "outputstr" in such a way it
is in json format.So,the expected output would be

[{"phone_number": "(620)790-6114x4000", "ssn": "575-97-5718", "email":
"[hidden email]", "name": "Kenneth Bradley"},{"phone_number":
"(000)790-6114x4000", "ssn": "470-97-5718", "email": "[hidden email]",
"name": "Romeo Bradley"}]

But the output I get is "["
There is some problem in "output" getting appended to "outputstr"
Please help me with appropriate suggestion.I'm not able to figure out where
I have gone wrong.

Thanks,
Vyshali




--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Executescript

Vyshali
In reply to this post by Andy Christianson-2
Hi andy,

I have successfully written the coding logic to do anonymization and was
able to execute it without error.
But I'm getting different results while running the same script on the same
input in Nifi and as a normal python script.I'm not sure what is the
problem.

Sample dataset :
Sharmila,[hidden email],999-12-0000
narasimha srinivasan,[hidden email],222-26-789
avyukt,[hidden email],456-89-5678

I have used seed functionality for maintaining consistency in the results of
anonymization. So,I should get same results for multiple faker instances.I'm
using the anonymizing code as normal python script and also executing in
Nifi using executescript processor.

When I run as python script,I'm getting the following output,
Scott Bryan,[hidden email],712-48-4862
James Miranda,[hidden email],446-57-4047
James Jordan,[hidden email],887-47-4663

When I execute the script in Nifi using executescript processor,I'm getting
the following output:
Andrew Simon,[hidden email],621-02-7781
Gregory Grant,[hidden email],709-80-9027
Holly Nelson,[hidden email],867-56-9800

Whether problem will be due to Nifi using "jython" ? If so,how could we
rectify across cross languages ?



--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Executescript

Joe Witt
It is awesome that you're using NiFi and engaging with the community.
However, please subscribe to the mailing list.  I will no longer
manually moderate these which mean they will not be seen unless you
subscribe or another moderator chooses to manually moderate.  If
you're unsure how to subscribe please let us know.  See here
https://nifi.apache.org/mailing_lists.html for more detail.

Thanks

On Thu, Nov 16, 2017 at 10:38 AM, Vyshali <[hidden email]> wrote:

> Hi andy,
>
> I have successfully written the coding logic to do anonymization and was
> able to execute it without error.
> But I'm getting different results while running the same script on the same
> input in Nifi and as a normal python script.I'm not sure what is the
> problem.
>
> Sample dataset :
> Sharmila,[hidden email],999-12-0000
> narasimha srinivasan,[hidden email],222-26-789
> avyukt,[hidden email],456-89-5678
>
> I have used seed functionality for maintaining consistency in the results of
> anonymization. So,I should get same results for multiple faker instances.I'm
> using the anonymizing code as normal python script and also executing in
> Nifi using executescript processor.
>
> When I run as python script,I'm getting the following output,
> Scott Bryan,[hidden email],712-48-4862
> James Miranda,[hidden email],446-57-4047
> James Jordan,[hidden email],887-47-4663
>
> When I execute the script in Nifi using executescript processor,I'm getting
> the following output:
> Andrew Simon,[hidden email],621-02-7781
> Gregory Grant,[hidden email],709-80-9027
> Holly Nelson,[hidden email],867-56-9800
>
> Whether problem will be due to Nifi using "jython" ? If so,how could we
> rectify across cross languages ?
>
>
>
> --
> Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Issue with Executescript

Vyshali