NiFi 1.2.0 Record processors question

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

NiFi 1.2.0 Record processors question

Joe Gresock
I've tried a couple different configurations of CSVReader /
JsonRecordSetWriter with the QueryRecord processor, and I don't think I
quite have the usage down yet.

Can someone give a basic example configuration in the following 2
scenarios?  I followed most of Matt Burgess's response to the post titled
"How to use ConvertRecord Processor", but I don't think it tells the whole
story.

1) QueryRecord, converting CSV to JSON, using only the CSV headers to
determine the schema.  (I tried selecting Use String Fields from Header in
CSVReader, but the processor really seems to want to use the
JsonRecordSetWriter to determine the schema)

2) QueryRecord, converting CSV to JSON, using a cached avro schema.  I
imagine I need to use InferAvroSchema here, but I'm not sure how to cache
it in the AvroSchemaRegistry.  Also not quite sure how to configure the
properties of each controller service in this case.

Any help would be appreciated.

Joe

--
I know what it is to be in need, and I know what it is to have plenty.  I
have learned the secret of being content in any and every situation,
whether well fed or hungry, whether living in plenty or in want.  I can do
all this through him who gives me strength.    *-Philippians 4:12-13*
Reply | Threaded
Open this post in threaded view
|

Re: NiFi 1.2.0 Record processors question

Joe Witt
Joe

Can you share a template with sample data?

These are powerful tools for which we need examples and docs to catch up
although the docs there now are already really helpful..

I'm happy to help you get your flow working or identify gaps as is.

Thanks
Joe

On May 19, 2017 12:25 PM, "Joe Gresock" <[hidden email]> wrote:

> I've tried a couple different configurations of CSVReader /
> JsonRecordSetWriter with the QueryRecord processor, and I don't think I
> quite have the usage down yet.
>
> Can someone give a basic example configuration in the following 2
> scenarios?  I followed most of Matt Burgess's response to the post titled
> "How to use ConvertRecord Processor", but I don't think it tells the whole
> story.
>
> 1) QueryRecord, converting CSV to JSON, using only the CSV headers to
> determine the schema.  (I tried selecting Use String Fields from Header in
> CSVReader, but the processor really seems to want to use the
> JsonRecordSetWriter to determine the schema)
>
> 2) QueryRecord, converting CSV to JSON, using a cached avro schema.  I
> imagine I need to use InferAvroSchema here, but I'm not sure how to cache
> it in the AvroSchemaRegistry.  Also not quite sure how to configure the
> properties of each controller service in this case.
>
> Any help would be appreciated.
>
> Joe
>
> --
> I know what it is to be in need, and I know what it is to have plenty.  I
> have learned the secret of being content in any and every situation,
> whether well fed or hungry, whether living in plenty or in want.  I can do
> all this through him who gives me strength.    *-Philippians 4:12-13*
>
Reply | Threaded
Open this post in threaded view
|

Re: NiFi 1.2.0 Record processors question

Matt Burgess-2
In reply to this post by Joe Gresock
Joe,

Using the CSV Headers to determine the schema is currently the only
"dynamic" schema strategy, so it will be tricky to use with the other
Readers/Writers and associated processors (which require an explicit
schema). This should be alleviated with NIFI-3291 [1].  For this first
release, I believe the approach would be to identify the various
schemas for your incoming/outgoing data, create a Schema Registry with
all of them, then the various Record Readers/Writers using those.

There were some issues during development related to using the
incoming vs outgoing schema for various record operations, if
QueryRecord seems to be using the output schema for querying then it
is likely a bug. However in this case it might just be that you need
an explicit schema for your Writer that matches the input schema
(which is inferred from the CSV header). The CSV Header inference
currently assumes all fields are Strings, so a nominal schema would
have the same number of fields as columns, each with type String. If
you don't know the number of columns and/or the column names are
dynamic per CSV file, I believe we have a gap here (for now).

I thought of maybe having a InferRecordSchema processor that would
fill in the avro.text attribute for use in various downstream record
readers/writers, but inferring schemas in general is not a trivial
task. An easier interim solution might be to have an
AddSchemaAsAttribute processor, which takes a Reader to parse the
records and determine the schema (whether dynamic or static), and set
the avro.text attribute on the original incoming flow file, then
transfer the original flow file. This would require two reads, one by
AddSchemaAsAttribute and one by the downstream record processor, but
it should be fairly easy to implement.  Then again, since new features
would go into 1.3.0, hopefully NIFI-3921 will be implemented by then,
rendering all this moot :)

Regards,
Matt

[1] https://issues.apache.org/jira/browse/NIFI-3921

On Fri, May 19, 2017 at 12:25 PM, Joe Gresock <[hidden email]> wrote:

> I've tried a couple different configurations of CSVReader /
> JsonRecordSetWriter with the QueryRecord processor, and I don't think I
> quite have the usage down yet.
>
> Can someone give a basic example configuration in the following 2
> scenarios?  I followed most of Matt Burgess's response to the post titled
> "How to use ConvertRecord Processor", but I don't think it tells the whole
> story.
>
> 1) QueryRecord, converting CSV to JSON, using only the CSV headers to
> determine the schema.  (I tried selecting Use String Fields from Header in
> CSVReader, but the processor really seems to want to use the
> JsonRecordSetWriter to determine the schema)
>
> 2) QueryRecord, converting CSV to JSON, using a cached avro schema.  I
> imagine I need to use InferAvroSchema here, but I'm not sure how to cache
> it in the AvroSchemaRegistry.  Also not quite sure how to configure the
> properties of each controller service in this case.
>
> Any help would be appreciated.
>
> Joe
>
> --
> I know what it is to be in need, and I know what it is to have plenty.  I
> have learned the secret of being content in any and every situation,
> whether well fed or hungry, whether living in plenty or in want.  I can do
> all this through him who gives me strength.    *-Philippians 4:12-13*
Reply | Threaded
Open this post in threaded view
|

Re: NiFi 1.2.0 Record processors question

Joe Gresock
Matt,

Great response, this does help explain a lot.  Reading through your post
made me realize I didn't understand the AvroSchemaRegistry.  I had been
thinking it was something that nifi processors populated, but I re-read its
usage description and it does indeed say to use dynamic properties for the
schema name / value.  In that case, I can definitely see how this is not
dynamic in the sense of inferring any schemas on the flow.  It makes me
wonder if there would be a way to populate the schema registry from flow
files.  When I first glanced at the processors, I had assumed that when the
schema was inferred from the CSV headers, it would create an entry in the
AvroSchemaRegistry, provided you filled in the correct properties.  Clearly
this is not how it works.

Just so I understand, does your first paragraph mean that even if you use
the CSV headers to determine the schema, you still can't use the *Record
processors unless you manually register a matching schema in the schema
registry, or otherwise somehow set the schema in an attribute?  In this
case, it almost seems like inferring the schema from the CSV headers serves
no purpose, and I don't see how NIFI-3921 would alleviate that (it only
appears to address avro flow files with embedded schema).

Based on this understanding, I was able to successfully get the following
flow working:
InferAvroSchema -> QueryRecord.

QueryRecord uses CSVReader with "Use Schema Text Property" and Schema Text
set to ${inferred.avro.schema} (which is populated by the InferAvroSchema
processor).  It also uses JsonRecordSetWriter with a similar
configuration.  I could attach a template, but I don't know the best way to
do that on the listserve.

Joe

On Fri, May 19, 2017 at 4:59 PM, Matt Burgess <[hidden email]> wrote:

> Joe,
>
> Using the CSV Headers to determine the schema is currently the only
> "dynamic" schema strategy, so it will be tricky to use with the other
> Readers/Writers and associated processors (which require an explicit
> schema). This should be alleviated with NIFI-3291 [1].  For this first
> release, I believe the approach would be to identify the various
> schemas for your incoming/outgoing data, create a Schema Registry with
> all of them, then the various Record Readers/Writers using those.
>
> There were some issues during development related to using the
> incoming vs outgoing schema for various record operations, if
> QueryRecord seems to be using the output schema for querying then it
> is likely a bug. However in this case it might just be that you need
> an explicit schema for your Writer that matches the input schema
> (which is inferred from the CSV header). The CSV Header inference
> currently assumes all fields are Strings, so a nominal schema would
> have the same number of fields as columns, each with type String. If
> you don't know the number of columns and/or the column names are
> dynamic per CSV file, I believe we have a gap here (for now).
>
> I thought of maybe having a InferRecordSchema processor that would
> fill in the avro.text attribute for use in various downstream record
> readers/writers, but inferring schemas in general is not a trivial
> task. An easier interim solution might be to have an
> AddSchemaAsAttribute processor, which takes a Reader to parse the
> records and determine the schema (whether dynamic or static), and set
> the avro.text attribute on the original incoming flow file, then
> transfer the original flow file. This would require two reads, one by
> AddSchemaAsAttribute and one by the downstream record processor, but
> it should be fairly easy to implement.  Then again, since new features
> would go into 1.3.0, hopefully NIFI-3921 will be implemented by then,
> rendering all this moot :)
>
> Regards,
> Matt
>
> [1] https://issues.apache.org/jira/browse/NIFI-3921
>
> On Fri, May 19, 2017 at 12:25 PM, Joe Gresock <[hidden email]> wrote:
> > I've tried a couple different configurations of CSVReader /
> > JsonRecordSetWriter with the QueryRecord processor, and I don't think I
> > quite have the usage down yet.
> >
> > Can someone give a basic example configuration in the following 2
> > scenarios?  I followed most of Matt Burgess's response to the post titled
> > "How to use ConvertRecord Processor", but I don't think it tells the
> whole
> > story.
> >
> > 1) QueryRecord, converting CSV to JSON, using only the CSV headers to
> > determine the schema.  (I tried selecting Use String Fields from Header
> in
> > CSVReader, but the processor really seems to want to use the
> > JsonRecordSetWriter to determine the schema)
> >
> > 2) QueryRecord, converting CSV to JSON, using a cached avro schema.  I
> > imagine I need to use InferAvroSchema here, but I'm not sure how to cache
> > it in the AvroSchemaRegistry.  Also not quite sure how to configure the
> > properties of each controller service in this case.
> >
> > Any help would be appreciated.
> >
> > Joe
> >
> > --
> > I know what it is to be in need, and I know what it is to have plenty.  I
> > have learned the secret of being content in any and every situation,
> > whether well fed or hungry, whether living in plenty or in want.  I can
> do
> > all this through him who gives me strength.    *-Philippians 4:12-13*
>



--
I know what it is to be in need, and I know what it is to have plenty.  I
have learned the secret of being content in any and every situation,
whether well fed or hungry, whether living in plenty or in want.  I can do
all this through him who gives me strength.    *-Philippians 4:12-13*
Reply | Threaded
Open this post in threaded view
|

Re: NiFi 1.2.0 Record processors question

Bryan Bende
When a reader produces a record it attaches the schema it used to the
record, but we currently don't have a way for a writer to use that
schema when writing a record, although I think we do want to support
that... something like a "Use Schema in Record" option as a choice in
the 'Schema Access Strategy' of writers.

For now, when a processor uses a reader and a writer, and you also
want to read and write with the same schema, then you would still have
to define the same schema for the writer to use even if you had a CSV
reader that inferred the schema from the headers.

There are some processors that only use a reader, like
PutDabaseRecord, where using the CSV header would still be helpful.

There are also a lot of cases where you where you would write with a
different schema then you read with, so using the CSV header for
reading is still helpful in those cases too.

Hopefully I am making sense and not confusing things more.


On Fri, May 19, 2017 at 1:27 PM, Joe Gresock <[hidden email]> wrote:

> Matt,
>
> Great response, this does help explain a lot.  Reading through your post
> made me realize I didn't understand the AvroSchemaRegistry.  I had been
> thinking it was something that nifi processors populated, but I re-read its
> usage description and it does indeed say to use dynamic properties for the
> schema name / value.  In that case, I can definitely see how this is not
> dynamic in the sense of inferring any schemas on the flow.  It makes me
> wonder if there would be a way to populate the schema registry from flow
> files.  When I first glanced at the processors, I had assumed that when the
> schema was inferred from the CSV headers, it would create an entry in the
> AvroSchemaRegistry, provided you filled in the correct properties.  Clearly
> this is not how it works.
>
> Just so I understand, does your first paragraph mean that even if you use
> the CSV headers to determine the schema, you still can't use the *Record
> processors unless you manually register a matching schema in the schema
> registry, or otherwise somehow set the schema in an attribute?  In this
> case, it almost seems like inferring the schema from the CSV headers serves
> no purpose, and I don't see how NIFI-3921 would alleviate that (it only
> appears to address avro flow files with embedded schema).
>
> Based on this understanding, I was able to successfully get the following
> flow working:
> InferAvroSchema -> QueryRecord.
>
> QueryRecord uses CSVReader with "Use Schema Text Property" and Schema Text
> set to ${inferred.avro.schema} (which is populated by the InferAvroSchema
> processor).  It also uses JsonRecordSetWriter with a similar
> configuration.  I could attach a template, but I don't know the best way to
> do that on the listserve.
>
> Joe
>
> On Fri, May 19, 2017 at 4:59 PM, Matt Burgess <[hidden email]> wrote:
>
>> Joe,
>>
>> Using the CSV Headers to determine the schema is currently the only
>> "dynamic" schema strategy, so it will be tricky to use with the other
>> Readers/Writers and associated processors (which require an explicit
>> schema). This should be alleviated with NIFI-3291 [1].  For this first
>> release, I believe the approach would be to identify the various
>> schemas for your incoming/outgoing data, create a Schema Registry with
>> all of them, then the various Record Readers/Writers using those.
>>
>> There were some issues during development related to using the
>> incoming vs outgoing schema for various record operations, if
>> QueryRecord seems to be using the output schema for querying then it
>> is likely a bug. However in this case it might just be that you need
>> an explicit schema for your Writer that matches the input schema
>> (which is inferred from the CSV header). The CSV Header inference
>> currently assumes all fields are Strings, so a nominal schema would
>> have the same number of fields as columns, each with type String. If
>> you don't know the number of columns and/or the column names are
>> dynamic per CSV file, I believe we have a gap here (for now).
>>
>> I thought of maybe having a InferRecordSchema processor that would
>> fill in the avro.text attribute for use in various downstream record
>> readers/writers, but inferring schemas in general is not a trivial
>> task. An easier interim solution might be to have an
>> AddSchemaAsAttribute processor, which takes a Reader to parse the
>> records and determine the schema (whether dynamic or static), and set
>> the avro.text attribute on the original incoming flow file, then
>> transfer the original flow file. This would require two reads, one by
>> AddSchemaAsAttribute and one by the downstream record processor, but
>> it should be fairly easy to implement.  Then again, since new features
>> would go into 1.3.0, hopefully NIFI-3921 will be implemented by then,
>> rendering all this moot :)
>>
>> Regards,
>> Matt
>>
>> [1] https://issues.apache.org/jira/browse/NIFI-3921
>>
>> On Fri, May 19, 2017 at 12:25 PM, Joe Gresock <[hidden email]> wrote:
>> > I've tried a couple different configurations of CSVReader /
>> > JsonRecordSetWriter with the QueryRecord processor, and I don't think I
>> > quite have the usage down yet.
>> >
>> > Can someone give a basic example configuration in the following 2
>> > scenarios?  I followed most of Matt Burgess's response to the post titled
>> > "How to use ConvertRecord Processor", but I don't think it tells the
>> whole
>> > story.
>> >
>> > 1) QueryRecord, converting CSV to JSON, using only the CSV headers to
>> > determine the schema.  (I tried selecting Use String Fields from Header
>> in
>> > CSVReader, but the processor really seems to want to use the
>> > JsonRecordSetWriter to determine the schema)
>> >
>> > 2) QueryRecord, converting CSV to JSON, using a cached avro schema.  I
>> > imagine I need to use InferAvroSchema here, but I'm not sure how to cache
>> > it in the AvroSchemaRegistry.  Also not quite sure how to configure the
>> > properties of each controller service in this case.
>> >
>> > Any help would be appreciated.
>> >
>> > Joe
>> >
>> > --
>> > I know what it is to be in need, and I know what it is to have plenty.  I
>> > have learned the secret of being content in any and every situation,
>> > whether well fed or hungry, whether living in plenty or in want.  I can
>> do
>> > all this through him who gives me strength.    *-Philippians 4:12-13*
>>
>
>
>
> --
> I know what it is to be in need, and I know what it is to have plenty.  I
> have learned the secret of being content in any and every situation,
> whether well fed or hungry, whether living in plenty or in want.  I can do
> all this through him who gives me strength.    *-Philippians 4:12-13*
Reply | Threaded
Open this post in threaded view
|

Re: NiFi 1.2.0 Record processors question

Matt Burgess-2
In reply to this post by Joe Gresock
I can update the Jira with more information around using the incoming
record's schema, Koji wrote it originally for a particular use case,
embedded schemas in a flow file vs input record schemas available to
the record processors. If they are too different, a new Jira would be
spawned. But in any case, I would say the CSV Schema From Headers
probably won't play nice with most of the other record stuff for now.

As a workaround, I put up a Gist [1] which is a Groovy script that can
be used in ExecuteScript to extract the Avro schema from the header
line of a CSV file into an attribute called avro.schema. This lends
itself to the "Use Schema Text" strategy, to help avoid the issue with
the CSVReader "Schema Names from Header" strategy.

Regards,
Matt

[1] https://gist.github.com/mattyb149/6c9ac2d0961b8ff38ad716646f45b073


On Fri, May 19, 2017 at 1:27 PM, Joe Gresock <[hidden email]> wrote:

> Matt,
>
> Great response, this does help explain a lot.  Reading through your post
> made me realize I didn't understand the AvroSchemaRegistry.  I had been
> thinking it was something that nifi processors populated, but I re-read its
> usage description and it does indeed say to use dynamic properties for the
> schema name / value.  In that case, I can definitely see how this is not
> dynamic in the sense of inferring any schemas on the flow.  It makes me
> wonder if there would be a way to populate the schema registry from flow
> files.  When I first glanced at the processors, I had assumed that when the
> schema was inferred from the CSV headers, it would create an entry in the
> AvroSchemaRegistry, provided you filled in the correct properties.  Clearly
> this is not how it works.
>
> Just so I understand, does your first paragraph mean that even if you use
> the CSV headers to determine the schema, you still can't use the *Record
> processors unless you manually register a matching schema in the schema
> registry, or otherwise somehow set the schema in an attribute?  In this
> case, it almost seems like inferring the schema from the CSV headers serves
> no purpose, and I don't see how NIFI-3921 would alleviate that (it only
> appears to address avro flow files with embedded schema).
>
> Based on this understanding, I was able to successfully get the following
> flow working:
> InferAvroSchema -> QueryRecord.
>
> QueryRecord uses CSVReader with "Use Schema Text Property" and Schema Text
> set to ${inferred.avro.schema} (which is populated by the InferAvroSchema
> processor).  It also uses JsonRecordSetWriter with a similar
> configuration.  I could attach a template, but I don't know the best way to
> do that on the listserve.
>
> Joe
>
> On Fri, May 19, 2017 at 4:59 PM, Matt Burgess <[hidden email]> wrote:
>
>> Joe,
>>
>> Using the CSV Headers to determine the schema is currently the only
>> "dynamic" schema strategy, so it will be tricky to use with the other
>> Readers/Writers and associated processors (which require an explicit
>> schema). This should be alleviated with NIFI-3291 [1].  For this first
>> release, I believe the approach would be to identify the various
>> schemas for your incoming/outgoing data, create a Schema Registry with
>> all of them, then the various Record Readers/Writers using those.
>>
>> There were some issues during development related to using the
>> incoming vs outgoing schema for various record operations, if
>> QueryRecord seems to be using the output schema for querying then it
>> is likely a bug. However in this case it might just be that you need
>> an explicit schema for your Writer that matches the input schema
>> (which is inferred from the CSV header). The CSV Header inference
>> currently assumes all fields are Strings, so a nominal schema would
>> have the same number of fields as columns, each with type String. If
>> you don't know the number of columns and/or the column names are
>> dynamic per CSV file, I believe we have a gap here (for now).
>>
>> I thought of maybe having a InferRecordSchema processor that would
>> fill in the avro.text attribute for use in various downstream record
>> readers/writers, but inferring schemas in general is not a trivial
>> task. An easier interim solution might be to have an
>> AddSchemaAsAttribute processor, which takes a Reader to parse the
>> records and determine the schema (whether dynamic or static), and set
>> the avro.text attribute on the original incoming flow file, then
>> transfer the original flow file. This would require two reads, one by
>> AddSchemaAsAttribute and one by the downstream record processor, but
>> it should be fairly easy to implement.  Then again, since new features
>> would go into 1.3.0, hopefully NIFI-3921 will be implemented by then,
>> rendering all this moot :)
>>
>> Regards,
>> Matt
>>
>> [1] https://issues.apache.org/jira/browse/NIFI-3921
>>
>> On Fri, May 19, 2017 at 12:25 PM, Joe Gresock <[hidden email]> wrote:
>> > I've tried a couple different configurations of CSVReader /
>> > JsonRecordSetWriter with the QueryRecord processor, and I don't think I
>> > quite have the usage down yet.
>> >
>> > Can someone give a basic example configuration in the following 2
>> > scenarios?  I followed most of Matt Burgess's response to the post titled
>> > "How to use ConvertRecord Processor", but I don't think it tells the
>> whole
>> > story.
>> >
>> > 1) QueryRecord, converting CSV to JSON, using only the CSV headers to
>> > determine the schema.  (I tried selecting Use String Fields from Header
>> in
>> > CSVReader, but the processor really seems to want to use the
>> > JsonRecordSetWriter to determine the schema)
>> >
>> > 2) QueryRecord, converting CSV to JSON, using a cached avro schema.  I
>> > imagine I need to use InferAvroSchema here, but I'm not sure how to cache
>> > it in the AvroSchemaRegistry.  Also not quite sure how to configure the
>> > properties of each controller service in this case.
>> >
>> > Any help would be appreciated.
>> >
>> > Joe
>> >
>> > --
>> > I know what it is to be in need, and I know what it is to have plenty.  I
>> > have learned the secret of being content in any and every situation,
>> > whether well fed or hungry, whether living in plenty or in want.  I can
>> do
>> > all this through him who gives me strength.    *-Philippians 4:12-13*
>>
>
>
>
> --
> I know what it is to be in need, and I know what it is to have plenty.  I
> have learned the secret of being content in any and every situation,
> whether well fed or hungry, whether living in plenty or in want.  I can do
> all this through him who gives me strength.    *-Philippians 4:12-13*
Reply | Threaded
Open this post in threaded view
|

Re: NiFi 1.2.0 Record processors question

Joe Gresock
In reply to this post by Bryan Bende
Yes, both of your examples help explain the use of the CSV header parsing.

I think I have a much better understanding of the new framework now, thanks
to Bryan and Matt.  Mission accomplished!

On Fri, May 19, 2017 at 7:04 PM, Bryan Bende <[hidden email]> wrote:

> When a reader produces a record it attaches the schema it used to the
> record, but we currently don't have a way for a writer to use that
> schema when writing a record, although I think we do want to support
> that... something like a "Use Schema in Record" option as a choice in
> the 'Schema Access Strategy' of writers.
>
> For now, when a processor uses a reader and a writer, and you also
> want to read and write with the same schema, then you would still have
> to define the same schema for the writer to use even if you had a CSV
> reader that inferred the schema from the headers.
>
> There are some processors that only use a reader, like
> PutDabaseRecord, where using the CSV header would still be helpful.
>
> There are also a lot of cases where you where you would write with a
> different schema then you read with, so using the CSV header for
> reading is still helpful in those cases too.
>
> Hopefully I am making sense and not confusing things more.
>
>
> On Fri, May 19, 2017 at 1:27 PM, Joe Gresock <[hidden email]> wrote:
> > Matt,
> >
> > Great response, this does help explain a lot.  Reading through your post
> > made me realize I didn't understand the AvroSchemaRegistry.  I had been
> > thinking it was something that nifi processors populated, but I re-read
> its
> > usage description and it does indeed say to use dynamic properties for
> the
> > schema name / value.  In that case, I can definitely see how this is not
> > dynamic in the sense of inferring any schemas on the flow.  It makes me
> > wonder if there would be a way to populate the schema registry from flow
> > files.  When I first glanced at the processors, I had assumed that when
> the
> > schema was inferred from the CSV headers, it would create an entry in the
> > AvroSchemaRegistry, provided you filled in the correct properties.
> Clearly
> > this is not how it works.
> >
> > Just so I understand, does your first paragraph mean that even if you use
> > the CSV headers to determine the schema, you still can't use the *Record
> > processors unless you manually register a matching schema in the schema
> > registry, or otherwise somehow set the schema in an attribute?  In this
> > case, it almost seems like inferring the schema from the CSV headers
> serves
> > no purpose, and I don't see how NIFI-3921 would alleviate that (it only
> > appears to address avro flow files with embedded schema).
> >
> > Based on this understanding, I was able to successfully get the following
> > flow working:
> > InferAvroSchema -> QueryRecord.
> >
> > QueryRecord uses CSVReader with "Use Schema Text Property" and Schema
> Text
> > set to ${inferred.avro.schema} (which is populated by the InferAvroSchema
> > processor).  It also uses JsonRecordSetWriter with a similar
> > configuration.  I could attach a template, but I don't know the best way
> to
> > do that on the listserve.
> >
> > Joe
> >
> > On Fri, May 19, 2017 at 4:59 PM, Matt Burgess <[hidden email]>
> wrote:
> >
> >> Joe,
> >>
> >> Using the CSV Headers to determine the schema is currently the only
> >> "dynamic" schema strategy, so it will be tricky to use with the other
> >> Readers/Writers and associated processors (which require an explicit
> >> schema). This should be alleviated with NIFI-3291 [1].  For this first
> >> release, I believe the approach would be to identify the various
> >> schemas for your incoming/outgoing data, create a Schema Registry with
> >> all of them, then the various Record Readers/Writers using those.
> >>
> >> There were some issues during development related to using the
> >> incoming vs outgoing schema for various record operations, if
> >> QueryRecord seems to be using the output schema for querying then it
> >> is likely a bug. However in this case it might just be that you need
> >> an explicit schema for your Writer that matches the input schema
> >> (which is inferred from the CSV header). The CSV Header inference
> >> currently assumes all fields are Strings, so a nominal schema would
> >> have the same number of fields as columns, each with type String. If
> >> you don't know the number of columns and/or the column names are
> >> dynamic per CSV file, I believe we have a gap here (for now).
> >>
> >> I thought of maybe having a InferRecordSchema processor that would
> >> fill in the avro.text attribute for use in various downstream record
> >> readers/writers, but inferring schemas in general is not a trivial
> >> task. An easier interim solution might be to have an
> >> AddSchemaAsAttribute processor, which takes a Reader to parse the
> >> records and determine the schema (whether dynamic or static), and set
> >> the avro.text attribute on the original incoming flow file, then
> >> transfer the original flow file. This would require two reads, one by
> >> AddSchemaAsAttribute and one by the downstream record processor, but
> >> it should be fairly easy to implement.  Then again, since new features
> >> would go into 1.3.0, hopefully NIFI-3921 will be implemented by then,
> >> rendering all this moot :)
> >>
> >> Regards,
> >> Matt
> >>
> >> [1] https://issues.apache.org/jira/browse/NIFI-3921
> >>
> >> On Fri, May 19, 2017 at 12:25 PM, Joe Gresock <[hidden email]>
> wrote:
> >> > I've tried a couple different configurations of CSVReader /
> >> > JsonRecordSetWriter with the QueryRecord processor, and I don't think
> I
> >> > quite have the usage down yet.
> >> >
> >> > Can someone give a basic example configuration in the following 2
> >> > scenarios?  I followed most of Matt Burgess's response to the post
> titled
> >> > "How to use ConvertRecord Processor", but I don't think it tells the
> >> whole
> >> > story.
> >> >
> >> > 1) QueryRecord, converting CSV to JSON, using only the CSV headers to
> >> > determine the schema.  (I tried selecting Use String Fields from
> Header
> >> in
> >> > CSVReader, but the processor really seems to want to use the
> >> > JsonRecordSetWriter to determine the schema)
> >> >
> >> > 2) QueryRecord, converting CSV to JSON, using a cached avro schema.  I
> >> > imagine I need to use InferAvroSchema here, but I'm not sure how to
> cache
> >> > it in the AvroSchemaRegistry.  Also not quite sure how to configure
> the
> >> > properties of each controller service in this case.
> >> >
> >> > Any help would be appreciated.
> >> >
> >> > Joe
> >> >
> >> > --
> >> > I know what it is to be in need, and I know what it is to have
> plenty.  I
> >> > have learned the secret of being content in any and every situation,
> >> > whether well fed or hungry, whether living in plenty or in want.  I
> can
> >> do
> >> > all this through him who gives me strength.    *-Philippians 4:12-13*
> >>
> >
> >
> >
> > --
> > I know what it is to be in need, and I know what it is to have plenty.  I
> > have learned the secret of being content in any and every situation,
> > whether well fed or hungry, whether living in plenty or in want.  I can
> do
> > all this through him who gives me strength.    *-Philippians 4:12-13*
>



--
I know what it is to be in need, and I know what it is to have plenty.  I
have learned the secret of being content in any and every situation,
whether well fed or hungry, whether living in plenty or in want.  I can do
all this through him who gives me strength.    *-Philippians 4:12-13*
Reply | Threaded
Open this post in threaded view
|

Re: NiFi 1.2.0 Record processors question

Koji Kawamura-2
I've updated the JIRA description to cover not only embedded Avro
schema but also ones such as derived from CSVReader.
https://issues.apache.org/jira/browse/NIFI-3921

Thanks,
Koji

On Sat, May 20, 2017 at 4:14 AM, Joe Gresock <[hidden email]> wrote:

> Yes, both of your examples help explain the use of the CSV header parsing.
>
> I think I have a much better understanding of the new framework now, thanks
> to Bryan and Matt.  Mission accomplished!
>
> On Fri, May 19, 2017 at 7:04 PM, Bryan Bende <[hidden email]> wrote:
>
>> When a reader produces a record it attaches the schema it used to the
>> record, but we currently don't have a way for a writer to use that
>> schema when writing a record, although I think we do want to support
>> that... something like a "Use Schema in Record" option as a choice in
>> the 'Schema Access Strategy' of writers.
>>
>> For now, when a processor uses a reader and a writer, and you also
>> want to read and write with the same schema, then you would still have
>> to define the same schema for the writer to use even if you had a CSV
>> reader that inferred the schema from the headers.
>>
>> There are some processors that only use a reader, like
>> PutDabaseRecord, where using the CSV header would still be helpful.
>>
>> There are also a lot of cases where you where you would write with a
>> different schema then you read with, so using the CSV header for
>> reading is still helpful in those cases too.
>>
>> Hopefully I am making sense and not confusing things more.
>>
>>
>> On Fri, May 19, 2017 at 1:27 PM, Joe Gresock <[hidden email]> wrote:
>> > Matt,
>> >
>> > Great response, this does help explain a lot.  Reading through your post
>> > made me realize I didn't understand the AvroSchemaRegistry.  I had been
>> > thinking it was something that nifi processors populated, but I re-read
>> its
>> > usage description and it does indeed say to use dynamic properties for
>> the
>> > schema name / value.  In that case, I can definitely see how this is not
>> > dynamic in the sense of inferring any schemas on the flow.  It makes me
>> > wonder if there would be a way to populate the schema registry from flow
>> > files.  When I first glanced at the processors, I had assumed that when
>> the
>> > schema was inferred from the CSV headers, it would create an entry in the
>> > AvroSchemaRegistry, provided you filled in the correct properties.
>> Clearly
>> > this is not how it works.
>> >
>> > Just so I understand, does your first paragraph mean that even if you use
>> > the CSV headers to determine the schema, you still can't use the *Record
>> > processors unless you manually register a matching schema in the schema
>> > registry, or otherwise somehow set the schema in an attribute?  In this
>> > case, it almost seems like inferring the schema from the CSV headers
>> serves
>> > no purpose, and I don't see how NIFI-3921 would alleviate that (it only
>> > appears to address avro flow files with embedded schema).
>> >
>> > Based on this understanding, I was able to successfully get the following
>> > flow working:
>> > InferAvroSchema -> QueryRecord.
>> >
>> > QueryRecord uses CSVReader with "Use Schema Text Property" and Schema
>> Text
>> > set to ${inferred.avro.schema} (which is populated by the InferAvroSchema
>> > processor).  It also uses JsonRecordSetWriter with a similar
>> > configuration.  I could attach a template, but I don't know the best way
>> to
>> > do that on the listserve.
>> >
>> > Joe
>> >
>> > On Fri, May 19, 2017 at 4:59 PM, Matt Burgess <[hidden email]>
>> wrote:
>> >
>> >> Joe,
>> >>
>> >> Using the CSV Headers to determine the schema is currently the only
>> >> "dynamic" schema strategy, so it will be tricky to use with the other
>> >> Readers/Writers and associated processors (which require an explicit
>> >> schema). This should be alleviated with NIFI-3291 [1].  For this first
>> >> release, I believe the approach would be to identify the various
>> >> schemas for your incoming/outgoing data, create a Schema Registry with
>> >> all of them, then the various Record Readers/Writers using those.
>> >>
>> >> There were some issues during development related to using the
>> >> incoming vs outgoing schema for various record operations, if
>> >> QueryRecord seems to be using the output schema for querying then it
>> >> is likely a bug. However in this case it might just be that you need
>> >> an explicit schema for your Writer that matches the input schema
>> >> (which is inferred from the CSV header). The CSV Header inference
>> >> currently assumes all fields are Strings, so a nominal schema would
>> >> have the same number of fields as columns, each with type String. If
>> >> you don't know the number of columns and/or the column names are
>> >> dynamic per CSV file, I believe we have a gap here (for now).
>> >>
>> >> I thought of maybe having a InferRecordSchema processor that would
>> >> fill in the avro.text attribute for use in various downstream record
>> >> readers/writers, but inferring schemas in general is not a trivial
>> >> task. An easier interim solution might be to have an
>> >> AddSchemaAsAttribute processor, which takes a Reader to parse the
>> >> records and determine the schema (whether dynamic or static), and set
>> >> the avro.text attribute on the original incoming flow file, then
>> >> transfer the original flow file. This would require two reads, one by
>> >> AddSchemaAsAttribute and one by the downstream record processor, but
>> >> it should be fairly easy to implement.  Then again, since new features
>> >> would go into 1.3.0, hopefully NIFI-3921 will be implemented by then,
>> >> rendering all this moot :)
>> >>
>> >> Regards,
>> >> Matt
>> >>
>> >> [1] https://issues.apache.org/jira/browse/NIFI-3921
>> >>
>> >> On Fri, May 19, 2017 at 12:25 PM, Joe Gresock <[hidden email]>
>> wrote:
>> >> > I've tried a couple different configurations of CSVReader /
>> >> > JsonRecordSetWriter with the QueryRecord processor, and I don't think
>> I
>> >> > quite have the usage down yet.
>> >> >
>> >> > Can someone give a basic example configuration in the following 2
>> >> > scenarios?  I followed most of Matt Burgess's response to the post
>> titled
>> >> > "How to use ConvertRecord Processor", but I don't think it tells the
>> >> whole
>> >> > story.
>> >> >
>> >> > 1) QueryRecord, converting CSV to JSON, using only the CSV headers to
>> >> > determine the schema.  (I tried selecting Use String Fields from
>> Header
>> >> in
>> >> > CSVReader, but the processor really seems to want to use the
>> >> > JsonRecordSetWriter to determine the schema)
>> >> >
>> >> > 2) QueryRecord, converting CSV to JSON, using a cached avro schema.  I
>> >> > imagine I need to use InferAvroSchema here, but I'm not sure how to
>> cache
>> >> > it in the AvroSchemaRegistry.  Also not quite sure how to configure
>> the
>> >> > properties of each controller service in this case.
>> >> >
>> >> > Any help would be appreciated.
>> >> >
>> >> > Joe
>> >> >
>> >> > --
>> >> > I know what it is to be in need, and I know what it is to have
>> plenty.  I
>> >> > have learned the secret of being content in any and every situation,
>> >> > whether well fed or hungry, whether living in plenty or in want.  I
>> can
>> >> do
>> >> > all this through him who gives me strength.    *-Philippians 4:12-13*
>> >>
>> >
>> >
>> >
>> > --
>> > I know what it is to be in need, and I know what it is to have plenty.  I
>> > have learned the secret of being content in any and every situation,
>> > whether well fed or hungry, whether living in plenty or in want.  I can
>> do
>> > all this through him who gives me strength.    *-Philippians 4:12-13*
>>
>
>
>
> --
> I know what it is to be in need, and I know what it is to have plenty.  I
> have learned the secret of being content in any and every situation,
> whether well fed or hungry, whether living in plenty or in want.  I can do
> all this through him who gives me strength.    *-Philippians 4:12-13*
Reply | Threaded
Open this post in threaded view
|

RE: [EXT] Re: NiFi 1.2.0 Record processors question

Peter Wicks (pwicks)
I appreciate the clarification as well. I was really confused why my Avro files weren't converting, and this explains it; though I have to say the error messages you run into in this scenario are not clear.

-----Original Message-----
From: Koji Kawamura [mailto:[hidden email]]
Sent: Monday, May 22, 2017 9:59 AM
To: dev <[hidden email]>
Subject: [EXT] Re: NiFi 1.2.0 Record processors question

I've updated the JIRA description to cover not only embedded Avro schema but also ones such as derived from CSVReader.
https://issues.apache.org/jira/browse/NIFI-3921

Thanks,
Koji

On Sat, May 20, 2017 at 4:14 AM, Joe Gresock <[hidden email]> wrote:

> Yes, both of your examples help explain the use of the CSV header parsing.
>
> I think I have a much better understanding of the new framework now,
> thanks to Bryan and Matt.  Mission accomplished!
>
> On Fri, May 19, 2017 at 7:04 PM, Bryan Bende <[hidden email]> wrote:
>
>> When a reader produces a record it attaches the schema it used to the
>> record, but we currently don't have a way for a writer to use that
>> schema when writing a record, although I think we do want to support
>> that... something like a "Use Schema in Record" option as a choice in
>> the 'Schema Access Strategy' of writers.
>>
>> For now, when a processor uses a reader and a writer, and you also
>> want to read and write with the same schema, then you would still
>> have to define the same schema for the writer to use even if you had
>> a CSV reader that inferred the schema from the headers.
>>
>> There are some processors that only use a reader, like
>> PutDabaseRecord, where using the CSV header would still be helpful.
>>
>> There are also a lot of cases where you where you would write with a
>> different schema then you read with, so using the CSV header for
>> reading is still helpful in those cases too.
>>
>> Hopefully I am making sense and not confusing things more.
>>
>>
>> On Fri, May 19, 2017 at 1:27 PM, Joe Gresock <[hidden email]> wrote:
>> > Matt,
>> >
>> > Great response, this does help explain a lot.  Reading through your
>> > post made me realize I didn't understand the AvroSchemaRegistry.  I
>> > had been thinking it was something that nifi processors populated,
>> > but I re-read
>> its
>> > usage description and it does indeed say to use dynamic properties
>> > for
>> the
>> > schema name / value.  In that case, I can definitely see how this
>> > is not dynamic in the sense of inferring any schemas on the flow.  
>> > It makes me wonder if there would be a way to populate the schema
>> > registry from flow files.  When I first glanced at the processors,
>> > I had assumed that when
>> the
>> > schema was inferred from the CSV headers, it would create an entry
>> > in the AvroSchemaRegistry, provided you filled in the correct properties.
>> Clearly
>> > this is not how it works.
>> >
>> > Just so I understand, does your first paragraph mean that even if
>> > you use the CSV headers to determine the schema, you still can't
>> > use the *Record processors unless you manually register a matching
>> > schema in the schema registry, or otherwise somehow set the schema
>> > in an attribute?  In this case, it almost seems like inferring the
>> > schema from the CSV headers
>> serves
>> > no purpose, and I don't see how NIFI-3921 would alleviate that (it
>> > only appears to address avro flow files with embedded schema).
>> >
>> > Based on this understanding, I was able to successfully get the
>> > following flow working:
>> > InferAvroSchema -> QueryRecord.
>> >
>> > QueryRecord uses CSVReader with "Use Schema Text Property" and
>> > Schema
>> Text
>> > set to ${inferred.avro.schema} (which is populated by the
>> > InferAvroSchema processor).  It also uses JsonRecordSetWriter with
>> > a similar configuration.  I could attach a template, but I don't
>> > know the best way
>> to
>> > do that on the listserve.
>> >
>> > Joe
>> >
>> > On Fri, May 19, 2017 at 4:59 PM, Matt Burgess
>> > <[hidden email]>
>> wrote:
>> >
>> >> Joe,
>> >>
>> >> Using the CSV Headers to determine the schema is currently the
>> >> only "dynamic" schema strategy, so it will be tricky to use with
>> >> the other Readers/Writers and associated processors (which require
>> >> an explicit schema). This should be alleviated with NIFI-3291 [1].  
>> >> For this first release, I believe the approach would be to
>> >> identify the various schemas for your incoming/outgoing data,
>> >> create a Schema Registry with all of them, then the various Record Readers/Writers using those.
>> >>
>> >> There were some issues during development related to using the
>> >> incoming vs outgoing schema for various record operations, if
>> >> QueryRecord seems to be using the output schema for querying then
>> >> it is likely a bug. However in this case it might just be that you
>> >> need an explicit schema for your Writer that matches the input
>> >> schema (which is inferred from the CSV header). The CSV Header
>> >> inference currently assumes all fields are Strings, so a nominal
>> >> schema would have the same number of fields as columns, each with
>> >> type String. If you don't know the number of columns and/or the
>> >> column names are dynamic per CSV file, I believe we have a gap here (for now).
>> >>
>> >> I thought of maybe having a InferRecordSchema processor that would
>> >> fill in the avro.text attribute for use in various downstream
>> >> record readers/writers, but inferring schemas in general is not a
>> >> trivial task. An easier interim solution might be to have an
>> >> AddSchemaAsAttribute processor, which takes a Reader to parse the
>> >> records and determine the schema (whether dynamic or static), and
>> >> set the avro.text attribute on the original incoming flow file,
>> >> then transfer the original flow file. This would require two
>> >> reads, one by AddSchemaAsAttribute and one by the downstream
>> >> record processor, but it should be fairly easy to implement.  Then
>> >> again, since new features would go into 1.3.0, hopefully NIFI-3921
>> >> will be implemented by then, rendering all this moot :)
>> >>
>> >> Regards,
>> >> Matt
>> >>
>> >> [1] https://issues.apache.org/jira/browse/NIFI-3921
>> >>
>> >> On Fri, May 19, 2017 at 12:25 PM, Joe Gresock <[hidden email]>
>> wrote:
>> >> > I've tried a couple different configurations of CSVReader /
>> >> > JsonRecordSetWriter with the QueryRecord processor, and I don't
>> >> > think
>> I
>> >> > quite have the usage down yet.
>> >> >
>> >> > Can someone give a basic example configuration in the following
>> >> > 2 scenarios?  I followed most of Matt Burgess's response to the
>> >> > post
>> titled
>> >> > "How to use ConvertRecord Processor", but I don't think it tells
>> >> > the
>> >> whole
>> >> > story.
>> >> >
>> >> > 1) QueryRecord, converting CSV to JSON, using only the CSV
>> >> > headers to determine the schema.  (I tried selecting Use String
>> >> > Fields from
>> Header
>> >> in
>> >> > CSVReader, but the processor really seems to want to use the
>> >> > JsonRecordSetWriter to determine the schema)
>> >> >
>> >> > 2) QueryRecord, converting CSV to JSON, using a cached avro
>> >> > schema.  I imagine I need to use InferAvroSchema here, but I'm
>> >> > not sure how to
>> cache
>> >> > it in the AvroSchemaRegistry.  Also not quite sure how to
>> >> > configure
>> the
>> >> > properties of each controller service in this case.
>> >> >
>> >> > Any help would be appreciated.
>> >> >
>> >> > Joe
>> >> >
>> >> > --
>> >> > I know what it is to be in need, and I know what it is to have
>> plenty.  I
>> >> > have learned the secret of being content in any and every
>> >> > situation, whether well fed or hungry, whether living in plenty
>> >> > or in want.  I
>> can
>> >> do
>> >> > all this through him who gives me strength.    *-Philippians 4:12-13*
>> >>
>> >
>> >
>> >
>> > --
>> > I know what it is to be in need, and I know what it is to have
>> > plenty.  I have learned the secret of being content in any and
>> > every situation, whether well fed or hungry, whether living in
>> > plenty or in want.  I can
>> do
>> > all this through him who gives me strength.    *-Philippians 4:12-13*
>>
>
>
>
> --
> I know what it is to be in need, and I know what it is to have plenty.  
> I have learned the secret of being content in any and every situation,
> whether well fed or hungry, whether living in plenty or in want.  I can do
> all this through him who gives me strength.    *-Philippians 4:12-13*