Formatting issues with Json: what is the best approach in NiFi?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Formatting issues with Json: what is the best approach in NiFi?

idioma
Hi,
I have a quite complex dataflow and half way through I am returned with a Json file that looks like this:

{"field1": "D",
 "field2": "12345",
 "field3": "myText",
 "field4": ,
 "field5": "B2",
 "field6": "B",
 "field7": 74664",
 "field8": 2,
 "field9": [something."2334", something."9973"],
 "field10": ,
 "field11": "9,
 "field12": "J"}

I will later on try ingesting it into ElasticSearch and I am sure I will be in trouble because the format is not valid. There are at least the following issues:

1) mismatched '"'
2) null values
2) values not wrapped in '"'
3) values wrongly wrapped in '"'', for example something."2334", which should be "something.2334"

What is the recommended approach for cleaning "messy" Json? Do you usually resort to ExecuteScript?  

Thanks,

I.
Reply | Threaded
Open this post in threaded view
|

Re: Formatting issues with Json: what is the best approach in NiFi?

idioma
Hi, I have had a look at the available processors and it seems I can do the following:

GetFile (with my json already created) ->EvaluateJsonPath -> UpdateAttribute ->PutFile

So, lets get started with simply adding the missing '"", I am under the impression that I can use something like this:

${myAttribute.1:append('"')}

where attribute myAttribute.1 is the value, for example, for field7. My question is how do I get hold that attribute from the incoming flowfile? Can you help?

Thanks,
Reply | Threaded
Open this post in threaded view
|

Re: Formatting issues with Json: what is the best approach in NiFi?

Andy LoPresto-2
You might also want to look at TransformJOLTProcessor [1] now available in 0.7.0 and 1.0 that allows various JOLT [2][3] operations on JSON. This was just added by Yolanda Davis and could be very helpful for you. 


Andy LoPresto
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

On May 17, 2016, at 8:54 AM, idioma <[hidden email]> wrote:

Hi, I have had a look at the available processors and it seems I can do the
following:

GetFile (with my json already created) ->EvaluateJsonPath -> UpdateAttribute
->PutFile

So, lets get started with simply adding the missing '"", I am under the
impression that I can use something like this:

${myAttribute.1:append('"')}

where attribute myAttribute.1 is the value, for example, for field7. My
question is how do I get hold that attribute from the incoming flowfile? Can
you help?

Thanks,



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Formatting-issues-with-Json-what-is-the-best-approach-in-NiFi-tp10412p10417.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.


signature.asc (859 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Formatting issues with Json: what is the best approach in NiFi?

idioma
Andy, thanks for sharing this with me, I was not aware of it. Unfortunately, for a number of reasons, I will need to use 0.5.1 for the time being, so I was looking at existing processors. I have had a go with the combination EvaluateJsonPath + UpdateAttribute and although I am not a bit more familiar with JsonPath expressions, this approach only works if your Json is already coming in as a valid one. I believe, in this case again, I should resort to a Groovy script to be run as part of ExecuteScript. If you can think of anything else, please drop your suggestion here.

Thank you so much
Reply | Threaded
Open this post in threaded view
|

Re: Formatting issues with Json: what is the best approach in NiFi?

Andy LoPresto-2
Ilaria,

Matt just wrote a JSON validator Groovy script for another answer on the mailing list. I’ve included it below. You can modify this to perform validation on the incoming JSON as defined by your custom schema, and then applying the various fixes. I’m not familiar with the library Matt is suggesting, but I would imagine the validate method throws specific errors or has optional return values to help specify what needs to be fixed. 

As for extracting the attribute itself into the script, session.getAttribute(flowfile, “attributeName”) is the easiest way. In the script Matt wrote below, the JSON is the flowfile content, not an attribute, so he uses session.read(flowfile, Closure which accepts InputStream and is cast as StreamCallback) to declare an inline block which reads the flowfile content and operates on it. 

Hope this helps. 

In the meantime, here's a Groovy script you could use in
ExecuteScript, just need to download the two JAR dependencies ([2] and
[3]) and add them to your Module Directory property.

import org.everit.json.schema.Schema
import org.everit.json.schema.loader.SchemaLoader
import org.json.JSONObject
import org.json.JSONTokener

flowFile = session.get()
if(!flowFile) return

jsonSchema = """
{
 "type": "object",
 "required": ["name", "tags", "timestamp", "fields"],
 "properties": {
   "name": {"type": "string"},
   "timestamp": {"type": "integer"},
   "tags": {"type": "object", "items": {"type": "string"}},
   "fields": { "type": "object"}
 }
}
"""

boolean valid = true
session.read(flowFile, { inputStream ->
  jsonInput = org.apache.commons.io.IOUtils.toString(inputStream,
java.nio.charset.StandardCharsets.UTF_8)
  JSONObject rawSchema = new JSONObject(new JSONTokener(new
ByteArrayInputStream(jsonSchema.bytes)))
  Schema schema = SchemaLoader.load(rawSchema)
  try {
     schema.validate(new JSONObject(jsonInput))
   } catch(ve) {
     log.error("Doesn't adhere to schema", ve)
     valid = false
   }
 } as InputStreamCallback)

session.transfer(flowFile, valid ? REL_SUCCESS : REL_FAILURE)


Hope this helps!

Regards,
Matt

[1] https://issues.apache.org/jira/browse/NIFI-1893
[2] http://mvnrepository.com/artifact/org.everit.json/org.everit.json.schema/1.3.0
[3] http://mvnrepository.com/artifact/org.json/json/20160212

Andy LoPresto
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

On May 17, 2016, at 12:51 PM, idioma <[hidden email]> wrote:

Andy, thanks for sharing this with me, I was not aware of it. Unfortunately,
for a number of reasons, I will need to use 0.5.1 for the time being, so I
was looking at existing processors. I have had a go with the combination
EvaluateJsonPath + UpdateAttribute and although I am not a bit more familiar
with JsonPath expressions, this approach only works if your Json is already
coming in as a valid one. I believe, in this case again, I should resort to
a Groovy script to be run as part of ExecuteScript. If you can think of
anything else, please drop your suggestion here.

Thank you so much



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Formatting-issues-with-Json-what-is-the-best-approach-in-NiFi-tp10412p10458.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.


signature.asc (859 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Formatting issues with Json: what is the best approach in NiFi?

idioma
Andy,
thank you so much for this, it sounds like a very interesting idea. To clarify, are you suggesting to modify the existing Json validator so that it can read the invalid Json? Will I then be able to pass it successfully to EvaluateJsonPath? Have I understood it correctly?

Thank you,

I.
Reply | Threaded
Open this post in threaded view
|

Re: Formatting issues with Json: what is the best approach in NiFi?

Andy LoPresto-2
My idea is to run something like the following:

flowFile = session.get()
if(!flowFile) return

jsonSchema = """
{
// define your schema here
}
"""

boolean valid = true
session.read(flowFile, { inputStream ->
  jsonInput = org.apache.commons.io.IOUtils.toString(inputStream,
java.nio.charset.StandardCharsets.UTF_8)
  JSONObject rawSchema = new JSONObject(new JSONTokener(new
ByteArrayInputStream(jsonSchema.bytes)))
  Schema schema = SchemaLoader.load(rawSchema)
  try {
     schema.validate(new JSONObject(jsonInput))
   } catch(ve) {
     log.error("Doesn't adhere to schema", ve)
     // inspect the validation exception and use it to determine if you need to escape quotes, add empty strings for missing values, etc.
   }
 } as InputStreamCallback)

session.transfer(flowFile, REL_SUCCESS)


Another option would be to use the JsonParserLax [1] option in Groovy to read the incoming JSON, unmarshal it to a Groovy map, apply a series of common transformations (you’ll have more control using native Groovy types rather than manipulating a long string), and then marshal it back to well-formed JSON. This might be an easier method than what I originally described above. 


Andy LoPresto
PGP Fingerprint: 70EC B3E5 98A6 5A3F D3C4  BACE 3C6E F65B 2F7D EF69

On May 18, 2016, at 1:44 PM, idioma <[hidden email]> wrote:

Andy,
thank you so much for this, it sounds like a very interesting idea. To
clarify, are you suggesting to modify the existing Json validator so that it
can read the invalid Json? Will I then be able to pass it successfully to
EvaluateJsonPath? Have I understood it correctly?

Thank you,

I.



--
View this message in context: http://apache-nifi-developer-list.39713.n7.nabble.com/Formatting-issues-with-Json-what-is-the-best-approach-in-NiFi-tp10412p10505.html
Sent from the Apache NiFi Developer List mailing list archive at Nabble.com.


signature.asc (859 bytes) Download Attachment