Quantcast

Partitioning from actual Data (FlowFile) in NiFi

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Partitioning from actual Data (FlowFile) in NiFi

Anshuman Ghosh
Hello everyone,

It would be great if you can help me implementing this use-case

   - Is there any way (NiFi processor) to use *an attribute (field/ column)
   value for partitioning* when writing the final FlowFile to HDFS/ other
   storage.
   - Earlier we were using simple system date (
   */year=${now():format('yyyy')}/month=${now():format('MM')}/day=${now():format('dd')}/*)
   for this but that doesn't make sense when we consume old data from Kafka
   and want to partition on original date (a date field inside Kafka message)


Thank you!

______________________

*Kind Regards,*
*Anshuman Ghosh*
*Contact - +49 179 9090964*
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Partitioning from actual Data (FlowFile) in NiFi

Joe Witt
Anshuman

Hello.  Please avoid directly addressing specific developers and
instead just address the mailing list you need (dev or user).

If your data is CSV, for example, you can use RouteText to efficiently
partition the incoming sets by matching field/column values and in so
doing you'll now have the flowfile attribute you need for that group.
Then you can merge those together with MergeContent for like
attributes and when writing to HDFS you can use that value.

With the next record reader/writer capabilities in Apache NiFI 1.2.0
we can now provide a record oriented PartitionRecord processor which
will then also let you easily do this pattern on all kinds of
formats/schemas in a nice/clean way.

Joe

On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
<[hidden email]> wrote:

> Hello everyone,
>
> It would be great if you can help me implementing this use-case
>
> Is there any way (NiFi processor) to use an attribute (field/ column) value
> for partitioning when writing the final FlowFile to HDFS/ other storage.
> Earlier we were using simple system date
> (/year=${now():format('yyyy')}/month=${now():format('MM')}/day=${now():format('dd')}/)
> for this but that doesn't make sense when we consume old data from Kafka and
> want to partition on original date (a date field inside Kafka message)
>
>
> Thank you!
> ______________________
>
> Kind Regards,
> Anshuman Ghosh
> Contact - +49 179 9090964
>
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Partitioning from actual Data (FlowFile) in NiFi

Anshuman Ghosh
Hello Joe,

Regret for the inconvenience, I would keep that in mind going forward!

Thank you for your suggestion :-)
We have recently built NiFi from the master branch, so it should be similar
to 1.2.0
We receive data in JSON format and then convert to Avro before writing to
HDFS.
The date filed here is an Unix timestamp of 19 digit (bigint)

It would be really great if you can help a bit on how we can achieve the
same with Avro here.
Thanking you in advance!


______________________

*Kind Regards,*
*Anshuman Ghosh*
*Contact - +49 179 9090964*


On Thu, May 11, 2017 at 3:53 PM, Joe Witt <[hidden email]> wrote:

> Anshuman
>
> Hello.  Please avoid directly addressing specific developers and
> instead just address the mailing list you need (dev or user).
>
> If your data is CSV, for example, you can use RouteText to efficiently
> partition the incoming sets by matching field/column values and in so
> doing you'll now have the flowfile attribute you need for that group.
> Then you can merge those together with MergeContent for like
> attributes and when writing to HDFS you can use that value.
>
> With the next record reader/writer capabilities in Apache NiFI 1.2.0
> we can now provide a record oriented PartitionRecord processor which
> will then also let you easily do this pattern on all kinds of
> formats/schemas in a nice/clean way.
>
> Joe
>
> On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
> <[hidden email]> wrote:
> > Hello everyone,
> >
> > It would be great if you can help me implementing this use-case
> >
> > Is there any way (NiFi processor) to use an attribute (field/ column)
> value
> > for partitioning when writing the final FlowFile to HDFS/ other storage.
> > Earlier we were using simple system date
> > (/year=${now():format('yyyy')}/month=${now():format('MM')}/
> day=${now():format('dd')}/)
> > for this but that doesn't make sense when we consume old data from Kafka
> and
> > want to partition on original date (a date field inside Kafka message)
> >
> >
> > Thank you!
> > ______________________
> >
> > Kind Regards,
> > Anshuman Ghosh
> > Contact - +49 179 9090964
> >
>
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Partitioning from actual Data (FlowFile) in NiFi

Bryan Bende
If your data is JSON, then you could extract the date field from the
JSON before you convert to Avro by using EvaluateJsonPath.

From there lets say you have an attribute called "time" with the unix
timestamp, you could use an UpdateAttribute processor to create
attributes for each part of the timestamp:

time.year = ${time:format("yyyy", "GMT")}
time.month = ${time:format("MM", "GMT")}
time.day = ${time:format("dd", "GMT")}

Then in PutHDFS you can do something similar to what you were already doing:

/year=${time.year}/month=${time.month}/day=${time.day}/

As Joe mentioned there is a bunch of new record reader/writer related
capabilities in 1.2.0, and there is a follow JIRA to add a "record
path" which would allow you to extract a value (like your date field)
from any data format.

On Thu, May 11, 2017 at 10:04 AM, Anshuman Ghosh
<[hidden email]> wrote:

> Hello Joe,
>
> Regret for the inconvenience, I would keep that in mind going forward!
>
> Thank you for your suggestion :-)
> We have recently built NiFi from the master branch, so it should be similar
> to 1.2.0
> We receive data in JSON format and then convert to Avro before writing to
> HDFS.
> The date filed here is an Unix timestamp of 19 digit (bigint)
>
> It would be really great if you can help a bit on how we can achieve the
> same with Avro here.
> Thanking you in advance!
>
>
> ______________________
>
> *Kind Regards,*
> *Anshuman Ghosh*
> *Contact - +49 179 9090964*
>
>
> On Thu, May 11, 2017 at 3:53 PM, Joe Witt <[hidden email]> wrote:
>
>> Anshuman
>>
>> Hello.  Please avoid directly addressing specific developers and
>> instead just address the mailing list you need (dev or user).
>>
>> If your data is CSV, for example, you can use RouteText to efficiently
>> partition the incoming sets by matching field/column values and in so
>> doing you'll now have the flowfile attribute you need for that group.
>> Then you can merge those together with MergeContent for like
>> attributes and when writing to HDFS you can use that value.
>>
>> With the next record reader/writer capabilities in Apache NiFI 1.2.0
>> we can now provide a record oriented PartitionRecord processor which
>> will then also let you easily do this pattern on all kinds of
>> formats/schemas in a nice/clean way.
>>
>> Joe
>>
>> On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
>> <[hidden email]> wrote:
>> > Hello everyone,
>> >
>> > It would be great if you can help me implementing this use-case
>> >
>> > Is there any way (NiFi processor) to use an attribute (field/ column)
>> value
>> > for partitioning when writing the final FlowFile to HDFS/ other storage.
>> > Earlier we were using simple system date
>> > (/year=${now():format('yyyy')}/month=${now():format('MM')}/
>> day=${now():format('dd')}/)
>> > for this but that doesn't make sense when we consume old data from Kafka
>> and
>> > want to partition on original date (a date field inside Kafka message)
>> >
>> >
>> > Thank you!
>> > ______________________
>> >
>> > Kind Regards,
>> > Anshuman Ghosh
>> > Contact - +49 179 9090964
>> >
>>
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Partitioning from actual Data (FlowFile) in NiFi

Joe Witt
Cool.  Bryan offers a good approach now.  And this JIRA captures a
really powerful way to do it going forward
https://issues.apache.org/jira/browse/NIFI-3866

Thanks
Joe

On Thu, May 11, 2017 at 10:41 AM, Bryan Bende <[hidden email]> wrote:

> If your data is JSON, then you could extract the date field from the
> JSON before you convert to Avro by using EvaluateJsonPath.
>
> From there lets say you have an attribute called "time" with the unix
> timestamp, you could use an UpdateAttribute processor to create
> attributes for each part of the timestamp:
>
> time.year = ${time:format("yyyy", "GMT")}
> time.month = ${time:format("MM", "GMT")}
> time.day = ${time:format("dd", "GMT")}
>
> Then in PutHDFS you can do something similar to what you were already doing:
>
> /year=${time.year}/month=${time.month}/day=${time.day}/
>
> As Joe mentioned there is a bunch of new record reader/writer related
> capabilities in 1.2.0, and there is a follow JIRA to add a "record
> path" which would allow you to extract a value (like your date field)
> from any data format.
>
> On Thu, May 11, 2017 at 10:04 AM, Anshuman Ghosh
> <[hidden email]> wrote:
>> Hello Joe,
>>
>> Regret for the inconvenience, I would keep that in mind going forward!
>>
>> Thank you for your suggestion :-)
>> We have recently built NiFi from the master branch, so it should be similar
>> to 1.2.0
>> We receive data in JSON format and then convert to Avro before writing to
>> HDFS.
>> The date filed here is an Unix timestamp of 19 digit (bigint)
>>
>> It would be really great if you can help a bit on how we can achieve the
>> same with Avro here.
>> Thanking you in advance!
>>
>>
>> ______________________
>>
>> *Kind Regards,*
>> *Anshuman Ghosh*
>> *Contact - +49 179 9090964*
>>
>>
>> On Thu, May 11, 2017 at 3:53 PM, Joe Witt <[hidden email]> wrote:
>>
>>> Anshuman
>>>
>>> Hello.  Please avoid directly addressing specific developers and
>>> instead just address the mailing list you need (dev or user).
>>>
>>> If your data is CSV, for example, you can use RouteText to efficiently
>>> partition the incoming sets by matching field/column values and in so
>>> doing you'll now have the flowfile attribute you need for that group.
>>> Then you can merge those together with MergeContent for like
>>> attributes and when writing to HDFS you can use that value.
>>>
>>> With the next record reader/writer capabilities in Apache NiFI 1.2.0
>>> we can now provide a record oriented PartitionRecord processor which
>>> will then also let you easily do this pattern on all kinds of
>>> formats/schemas in a nice/clean way.
>>>
>>> Joe
>>>
>>> On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
>>> <[hidden email]> wrote:
>>> > Hello everyone,
>>> >
>>> > It would be great if you can help me implementing this use-case
>>> >
>>> > Is there any way (NiFi processor) to use an attribute (field/ column)
>>> value
>>> > for partitioning when writing the final FlowFile to HDFS/ other storage.
>>> > Earlier we were using simple system date
>>> > (/year=${now():format('yyyy')}/month=${now():format('MM')}/
>>> day=${now():format('dd')}/)
>>> > for this but that doesn't make sense when we consume old data from Kafka
>>> and
>>> > want to partition on original date (a date field inside Kafka message)
>>> >
>>> >
>>> > Thank you!
>>> > ______________________
>>> >
>>> > Kind Regards,
>>> > Anshuman Ghosh
>>> > Contact - +49 179 9090964
>>> >
>>>
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Partitioning from actual Data (FlowFile) in NiFi

Anshuman Ghosh
Thank you so much Bryan :-)
It is working fine now as the following workflow

*Consume from Kafka ==> *
*Evaluate JSON path (Timestamp) ==> *
*Update Attribute to get year, month and day; since we receive a 19 digit
long Timestamp value , we had to use the following trick
(**${Click.RequestTimestamp:toString():substring(0,13):toNumber():format("yyyy",
"GMT")}**) ==> Convert JSON to Avro ==> *
*Merge Content on similar Attribute (Timestamp - Date) ==> *
*Write merged FlowFile onto Google Cloud Storage (GCS) buckets*

​Let me know whether it can be further improvised.
Also will it be okay to use a "*CompressContent*" processor right after
merge step?​


Than
​king you in advance!​


______________________

*Kind Regards,*
*Anshuman Ghosh*
*Contact - +49 179 9090964*



On Thu, May 11, 2017 at 4:44 PM, Joe Witt <[hidden email]> wrote:

> Cool.  Bryan offers a good approach now.  And this JIRA captures a
> really powerful way to do it going forward
> https://issues.apache.org/jira/browse/NIFI-3866
>
> Thanks
> Joe
>
> On Thu, May 11, 2017 at 10:41 AM, Bryan Bende <[hidden email]> wrote:
> > If your data is JSON, then you could extract the date field from the
> > JSON before you convert to Avro by using EvaluateJsonPath.
> >
> > From there lets say you have an attribute called "time" with the unix
> > timestamp, you could use an UpdateAttribute processor to create
> > attributes for each part of the timestamp:
> >
> > time.year = ${time:format("yyyy", "GMT")}
> > time.month = ${time:format("MM", "GMT")}
> > time.day = ${time:format("dd", "GMT")}
> >
> > Then in PutHDFS you can do something similar to what you were already
> doing:
> >
> > /year=${time.year}/month=${time.month}/day=${time.day}/
> >
> > As Joe mentioned there is a bunch of new record reader/writer related
> > capabilities in 1.2.0, and there is a follow JIRA to add a "record
> > path" which would allow you to extract a value (like your date field)
> > from any data format.
> >
> > On Thu, May 11, 2017 at 10:04 AM, Anshuman Ghosh
> > <[hidden email]> wrote:
> >> Hello Joe,
> >>
> >> Regret for the inconvenience, I would keep that in mind going forward!
> >>
> >> Thank you for your suggestion :-)
> >> We have recently built NiFi from the master branch, so it should be
> similar
> >> to 1.2.0
> >> We receive data in JSON format and then convert to Avro before writing
> to
> >> HDFS.
> >> The date filed here is an Unix timestamp of 19 digit (bigint)
> >>
> >> It would be really great if you can help a bit on how we can achieve the
> >> same with Avro here.
> >> Thanking you in advance!
> >>
> >>
> >> ______________________
> >>
> >> *Kind Regards,*
> >> *Anshuman Ghosh*
> >> *Contact - +49 179 9090964*
> >>
> >>
> >> On Thu, May 11, 2017 at 3:53 PM, Joe Witt <[hidden email]> wrote:
> >>
> >>> Anshuman
> >>>
> >>> Hello.  Please avoid directly addressing specific developers and
> >>> instead just address the mailing list you need (dev or user).
> >>>
> >>> If your data is CSV, for example, you can use RouteText to efficiently
> >>> partition the incoming sets by matching field/column values and in so
> >>> doing you'll now have the flowfile attribute you need for that group.
> >>> Then you can merge those together with MergeContent for like
> >>> attributes and when writing to HDFS you can use that value.
> >>>
> >>> With the next record reader/writer capabilities in Apache NiFI 1.2.0
> >>> we can now provide a record oriented PartitionRecord processor which
> >>> will then also let you easily do this pattern on all kinds of
> >>> formats/schemas in a nice/clean way.
> >>>
> >>> Joe
> >>>
> >>> On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
> >>> <[hidden email]> wrote:
> >>> > Hello everyone,
> >>> >
> >>> > It would be great if you can help me implementing this use-case
> >>> >
> >>> > Is there any way (NiFi processor) to use an attribute (field/ column)
> >>> value
> >>> > for partitioning when writing the final FlowFile to HDFS/ other
> storage.
> >>> > Earlier we were using simple system date
> >>> > (/year=${now():format('yyyy')}/month=${now():format('MM')}/
> >>> day=${now():format('dd')}/)
> >>> > for this but that doesn't make sense when we consume old data from
> Kafka
> >>> and
> >>> > want to partition on original date (a date field inside Kafka
> message)
> >>> >
> >>> >
> >>> > Thank you!
> >>> > ______________________
> >>> >
> >>> > Kind Regards,
> >>> > Anshuman Ghosh
> >>> > Contact - +49 179 9090964
> >>> >
> >>>
>
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Partitioning from actual Data (FlowFile) in NiFi

Andrew Grande-2
Yes to compress. The output of the merge step is a larger piece of data, no
more/older than configured by the merge step. It can produce partial
smaller buckets if it were configured with max age attribute.

Andrew

On Mon, May 15, 2017, 5:28 AM Anshuman Ghosh <[hidden email]>
wrote:

> Thank you so much Bryan :-)
> It is working fine now as the following workflow
>
> *Consume from Kafka ==> *
> *Evaluate JSON path (Timestamp) ==> *
> *Update Attribute to get year, month and day; since we receive a 19 digit
> long Timestamp value , we had to use the following trick
>
> (**${Click.RequestTimestamp:toString():substring(0,13):toNumber():format("yyyy",
> "GMT")}**) ==> Convert JSON to Avro ==> *
> *Merge Content on similar Attribute (Timestamp - Date) ==> *
> *Write merged FlowFile onto Google Cloud Storage (GCS) buckets*
>
> ​Let me know whether it can be further improvised.
> Also will it be okay to use a "*CompressContent*" processor right after
> merge step?​
>
>
> Than
> ​king you in advance!​
>
> ​
> ______________________
>
> *Kind Regards,*
> *Anshuman Ghosh*
> *Contact - +49 179 9090964*
>
>
>
> On Thu, May 11, 2017 at 4:44 PM, Joe Witt <[hidden email]> wrote:
>
> > Cool.  Bryan offers a good approach now.  And this JIRA captures a
> > really powerful way to do it going forward
> > https://issues.apache.org/jira/browse/NIFI-3866
> >
> > Thanks
> > Joe
> >
> > On Thu, May 11, 2017 at 10:41 AM, Bryan Bende <[hidden email]> wrote:
> > > If your data is JSON, then you could extract the date field from the
> > > JSON before you convert to Avro by using EvaluateJsonPath.
> > >
> > > From there lets say you have an attribute called "time" with the unix
> > > timestamp, you could use an UpdateAttribute processor to create
> > > attributes for each part of the timestamp:
> > >
> > > time.year = ${time:format("yyyy", "GMT")}
> > > time.month = ${time:format("MM", "GMT")}
> > > time.day = ${time:format("dd", "GMT")}
> > >
> > > Then in PutHDFS you can do something similar to what you were already
> > doing:
> > >
> > > /year=${time.year}/month=${time.month}/day=${time.day}/
> > >
> > > As Joe mentioned there is a bunch of new record reader/writer related
> > > capabilities in 1.2.0, and there is a follow JIRA to add a "record
> > > path" which would allow you to extract a value (like your date field)
> > > from any data format.
> > >
> > > On Thu, May 11, 2017 at 10:04 AM, Anshuman Ghosh
> > > <[hidden email]> wrote:
> > >> Hello Joe,
> > >>
> > >> Regret for the inconvenience, I would keep that in mind going forward!
> > >>
> > >> Thank you for your suggestion :-)
> > >> We have recently built NiFi from the master branch, so it should be
> > similar
> > >> to 1.2.0
> > >> We receive data in JSON format and then convert to Avro before writing
> > to
> > >> HDFS.
> > >> The date filed here is an Unix timestamp of 19 digit (bigint)
> > >>
> > >> It would be really great if you can help a bit on how we can achieve
> the
> > >> same with Avro here.
> > >> Thanking you in advance!
> > >>
> > >>
> > >> ______________________
> > >>
> > >> *Kind Regards,*
> > >> *Anshuman Ghosh*
> > >> *Contact - +49 179 9090964*
> > >>
> > >>
> > >> On Thu, May 11, 2017 at 3:53 PM, Joe Witt <[hidden email]> wrote:
> > >>
> > >>> Anshuman
> > >>>
> > >>> Hello.  Please avoid directly addressing specific developers and
> > >>> instead just address the mailing list you need (dev or user).
> > >>>
> > >>> If your data is CSV, for example, you can use RouteText to
> efficiently
> > >>> partition the incoming sets by matching field/column values and in so
> > >>> doing you'll now have the flowfile attribute you need for that group.
> > >>> Then you can merge those together with MergeContent for like
> > >>> attributes and when writing to HDFS you can use that value.
> > >>>
> > >>> With the next record reader/writer capabilities in Apache NiFI 1.2.0
> > >>> we can now provide a record oriented PartitionRecord processor which
> > >>> will then also let you easily do this pattern on all kinds of
> > >>> formats/schemas in a nice/clean way.
> > >>>
> > >>> Joe
> > >>>
> > >>> On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
> > >>> <[hidden email]> wrote:
> > >>> > Hello everyone,
> > >>> >
> > >>> > It would be great if you can help me implementing this use-case
> > >>> >
> > >>> > Is there any way (NiFi processor) to use an attribute (field/
> column)
> > >>> value
> > >>> > for partitioning when writing the final FlowFile to HDFS/ other
> > storage.
> > >>> > Earlier we were using simple system date
> > >>> > (/year=${now():format('yyyy')}/month=${now():format('MM')}/
> > >>> day=${now():format('dd')}/)
> > >>> > for this but that doesn't make sense when we consume old data from
> > Kafka
> > >>> and
> > >>> > want to partition on original date (a date field inside Kafka
> > message)
> > >>> >
> > >>> >
> > >>> > Thank you!
> > >>> > ______________________
> > >>> >
> > >>> > Kind Regards,
> > >>> > Anshuman Ghosh
> > >>> > Contact - +49 179 9090964
> > >>> >
> > >>>
> >
>
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Partitioning from actual Data (FlowFile) in NiFi

Anshuman Ghosh
Thank you so much for the reply!

We have one more question regarding the whole flow

   - This entire flow described above has been encapsulated inside a
   template for export and import purpose in different environment.
   - Let's say one version of a Flow is running presently in PROD.
   - After making some minor changes, a new version of the flow has been
   imported into PROD.
   - Before starting the new version of the flow, we need to stop the old
   version; *otherwise there will be a problem with Kafka consumption from
   the same topic*.
   - So if we stop the old version of the flow and then start the new
   version of the flow it will be fine!
   - However there will be some FlowFiles still in the queue to be
   processed (like for Merge Content). How to completely process them? How to
   automate flush/ process of all FlowFiles before actually stop and mark it
   as an old version?
   - Find the rough Flow outline below as well

*Flow outline*
1. Consume from Kafka
2. Evaluate JSON Path
3. Update attribute to get year, month, day from a Timestamp
4. Convert JSON to Avro
5. Merged content based on the date attribute
6. Write to GCS
7. Partitions refresh


Than
​king
 you
​ in advance​
!

______________________

*Kind Regards,*
*Anshuman Ghosh*
*Contact - +49 179 9090964*


On Mon, May 15, 2017 at 1:53 PM, Andrew Grande <[hidden email]> wrote:

> Yes to compress. The output of the merge step is a larger piece of data, no
> more/older than configured by the merge step. It can produce partial
> smaller buckets if it were configured with max age attribute.
>
> Andrew
>
> On Mon, May 15, 2017, 5:28 AM Anshuman Ghosh <[hidden email]
> >
> wrote:
>
> > Thank you so much Bryan :-)
> > It is working fine now as the following workflow
> >
> > *Consume from Kafka ==> *
> > *Evaluate JSON path (Timestamp) ==> *
> > *Update Attribute to get year, month and day; since we receive a 19 digit
> > long Timestamp value , we had to use the following trick
> >
> > (**${Click.RequestTimestamp:toString():substring(0,13):
> toNumber():format("yyyy",
> > "GMT")}**) ==> Convert JSON to Avro ==> *
> > *Merge Content on similar Attribute (Timestamp - Date) ==> *
> > *Write merged FlowFile onto Google Cloud Storage (GCS) buckets*
> >
> > ​Let me know whether it can be further improvised.
> > Also will it be okay to use a "*CompressContent*" processor right after
> > merge step?​
> >
> >
> > Than
> > ​king you in advance!​
> >
> > ​
> > ______________________
> >
> > *Kind Regards,*
> > *Anshuman Ghosh*
> > *Contact - +49 179 9090964*
> >
> >
> >
> > On Thu, May 11, 2017 at 4:44 PM, Joe Witt <[hidden email]> wrote:
> >
> > > Cool.  Bryan offers a good approach now.  And this JIRA captures a
> > > really powerful way to do it going forward
> > > https://issues.apache.org/jira/browse/NIFI-3866
> > >
> > > Thanks
> > > Joe
> > >
> > > On Thu, May 11, 2017 at 10:41 AM, Bryan Bende <[hidden email]>
> wrote:
> > > > If your data is JSON, then you could extract the date field from the
> > > > JSON before you convert to Avro by using EvaluateJsonPath.
> > > >
> > > > From there lets say you have an attribute called "time" with the unix
> > > > timestamp, you could use an UpdateAttribute processor to create
> > > > attributes for each part of the timestamp:
> > > >
> > > > time.year = ${time:format("yyyy", "GMT")}
> > > > time.month = ${time:format("MM", "GMT")}
> > > > time.day = ${time:format("dd", "GMT")}
> > > >
> > > > Then in PutHDFS you can do something similar to what you were already
> > > doing:
> > > >
> > > > /year=${time.year}/month=${time.month}/day=${time.day}/
> > > >
> > > > As Joe mentioned there is a bunch of new record reader/writer related
> > > > capabilities in 1.2.0, and there is a follow JIRA to add a "record
> > > > path" which would allow you to extract a value (like your date field)
> > > > from any data format.
> > > >
> > > > On Thu, May 11, 2017 at 10:04 AM, Anshuman Ghosh
> > > > <[hidden email]> wrote:
> > > >> Hello Joe,
> > > >>
> > > >> Regret for the inconvenience, I would keep that in mind going
> forward!
> > > >>
> > > >> Thank you for your suggestion :-)
> > > >> We have recently built NiFi from the master branch, so it should be
> > > similar
> > > >> to 1.2.0
> > > >> We receive data in JSON format and then convert to Avro before
> writing
> > > to
> > > >> HDFS.
> > > >> The date filed here is an Unix timestamp of 19 digit (bigint)
> > > >>
> > > >> It would be really great if you can help a bit on how we can achieve
> > the
> > > >> same with Avro here.
> > > >> Thanking you in advance!
> > > >>
> > > >>
> > > >> ______________________
> > > >>
> > > >> *Kind Regards,*
> > > >> *Anshuman Ghosh*
> > > >> *Contact - +49 179 9090964*
> > > >>
> > > >>
> > > >> On Thu, May 11, 2017 at 3:53 PM, Joe Witt <[hidden email]>
> wrote:
> > > >>
> > > >>> Anshuman
> > > >>>
> > > >>> Hello.  Please avoid directly addressing specific developers and
> > > >>> instead just address the mailing list you need (dev or user).
> > > >>>
> > > >>> If your data is CSV, for example, you can use RouteText to
> > efficiently
> > > >>> partition the incoming sets by matching field/column values and in
> so
> > > >>> doing you'll now have the flowfile attribute you need for that
> group.
> > > >>> Then you can merge those together with MergeContent for like
> > > >>> attributes and when writing to HDFS you can use that value.
> > > >>>
> > > >>> With the next record reader/writer capabilities in Apache NiFI
> 1.2.0
> > > >>> we can now provide a record oriented PartitionRecord processor
> which
> > > >>> will then also let you easily do this pattern on all kinds of
> > > >>> formats/schemas in a nice/clean way.
> > > >>>
> > > >>> Joe
> > > >>>
> > > >>> On Thu, May 11, 2017 at 9:49 AM, Anshuman Ghosh
> > > >>> <[hidden email]> wrote:
> > > >>> > Hello everyone,
> > > >>> >
> > > >>> > It would be great if you can help me implementing this use-case
> > > >>> >
> > > >>> > Is there any way (NiFi processor) to use an attribute (field/
> > column)
> > > >>> value
> > > >>> > for partitioning when writing the final FlowFile to HDFS/ other
> > > storage.
> > > >>> > Earlier we were using simple system date
> > > >>> > (/year=${now():format('yyyy')}/month=${now():format('MM')}/
> > > >>> day=${now():format('dd')}/)
> > > >>> > for this but that doesn't make sense when we consume old data
> from
> > > Kafka
> > > >>> and
> > > >>> > want to partition on original date (a date field inside Kafka
> > > message)
> > > >>> >
> > > >>> >
> > > >>> > Thank you!
> > > >>> > ______________________
> > > >>> >
> > > >>> > Kind Regards,
> > > >>> > Anshuman Ghosh
> > > >>> > Contact - +49 179 9090964
> > > >>> >
> > > >>>
> > >
> >
>
Loading...