Sliding windows

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Sliding windows

Craig Knell
Hi Folks

We have a stream of data that I need to window to 5 minutes and the window is to slide every 3 minutes. Each minute is 1 mb, I therefore have to deliver 5mb per 3 minutes.  

What is the best way of achieving this in nifi?

Best regards

Craig
Reply | Threaded
Open this post in threaded view
|

RE: [EXT] Sliding windows

Peter Wicks (pwicks)
Craig,

If you have a timestamp set as an attribute on the processor, then this is kind of possible.

Have a regular MergeContent processor, with "Maximum Group Size" set to 1 mb, set "Max Bin Age" to 3 min; you may need to tweak settings to get the right cadence, but these are generally the settings you need to touch. Use the "Merged" relationship for whatever you need. To create the Window, pass the "Original" relationship to a RouteOnAttribute processor.

In the RouteOnAttribute use NiFi Expression Language to calculate how old the FlowFile is (using the timestamp attribute I mentioned). If the FlowFile is older than x, drop it, else send it back to the MergeContent processor.

Using this process, it should be easy to get a 5 min rolling window (drop any FlowFile older than 5 min in RouteOnAttribute).

I don't know that this perfectly answers what you asked, but does it give you a good direction to investigate?

Thanks,
  Peter

-----Original Message-----
From: Craig Knell <[hidden email]>
Sent: Tuesday, June 4, 2019 1:32 AM
To: [hidden email]
Subject: [EXT] Sliding windows

Hi Folks

We have a stream of data that I need to window to 5 minutes and the window is to slide every 3 minutes. Each minute is 1 mb, I therefore have to deliver 5mb per 3 minutes.  

What is the best way of achieving this in nifi?

Best regards

Craig
Reply | Threaded
Open this post in threaded view
|

Re: [EXT] Sliding windows

Joe Witt
...from the description it isn't clear what you're trying to achieve so
lets first try to expand the detail on the use case.

We should distinguish whether you're wanting to 'combine various objects in
a data stream together on some time bound' from 'processing various objects
in a data stream to make some observation over some time bound'.

If you're wanting to merge data together to make a larger object comprised
of those smaller objects then MergeContent is your friend.

If you're wanting to look at a given stream or set of streams at once and
make a time window based observation over that data then I recommend
looking at something like Apache Flink which is purpose built for that and
should be better than NiFi at that part.  If it is a pretty straight
forward single stream window evaluation and you want to avoid having
another system in play then I'd just write a little custom processor in
NiFi for your case.  Once you have a more complex data distribution and
processing requirement and you want a powerful low latency combination I'd
say put NiFi, Kafka, and Flink together for a pretty hard to beat combo.

Thanks

On Tue, Jun 4, 2019 at 9:51 AM Peter Wicks (pwicks) <[hidden email]>
wrote:

> Craig,
>
> If you have a timestamp set as an attribute on the processor, then this is
> kind of possible.
>
> Have a regular MergeContent processor, with "Maximum Group Size" set to 1
> mb, set "Max Bin Age" to 3 min; you may need to tweak settings to get the
> right cadence, but these are generally the settings you need to touch. Use
> the "Merged" relationship for whatever you need. To create the Window, pass
> the "Original" relationship to a RouteOnAttribute processor.
>
> In the RouteOnAttribute use NiFi Expression Language to calculate how old
> the FlowFile is (using the timestamp attribute I mentioned). If the
> FlowFile is older than x, drop it, else send it back to the MergeContent
> processor.
>
> Using this process, it should be easy to get a 5 min rolling window (drop
> any FlowFile older than 5 min in RouteOnAttribute).
>
> I don't know that this perfectly answers what you asked, but does it give
> you a good direction to investigate?
>
> Thanks,
>   Peter
>
> -----Original Message-----
> From: Craig Knell <[hidden email]>
> Sent: Tuesday, June 4, 2019 1:32 AM
> To: [hidden email]
> Subject: [EXT] Sliding windows
>
> Hi Folks
>
> We have a stream of data that I need to window to 5 minutes and the window
> is to slide every 3 minutes. Each minute is 1 mb, I therefore have to
> deliver 5mb per 3 minutes.
>
> What is the best way of achieving this in nifi?
>
> Best regards
>
> Craig
>
Reply | Threaded
Open this post in threaded view
|

Re: [EXT] Sliding windows

Craig Knell
Joe

Thanks for the options.  

Kafka Flink option is interesting but to big at the moment.

I’ll build a custom processor for this occasion.  

Much appreciated!

Craig Knell


> On 4 Jun 2019, at 21:56, Joe Witt <[hidden email]> wrote:
>
> ...from the description it isn't clear what you're trying to achieve so
> lets first try to expand the detail on the use case.
>
> We should distinguish whether you're wanting to 'combine various objects in
> a data stream together on some time bound' from 'processing various objects
> in a data stream to make some observation over some time bound'.
>
> If you're wanting to merge data together to make a larger object comprised
> of those smaller objects then MergeContent is your friend.
>
> If you're wanting to look at a given stream or set of streams at once and
> make a time window based observation over that data then I recommend
> looking at something like Apache Flink which is purpose built for that and
> should be better than NiFi at that part.  If it is a pretty straight
> forward single stream window evaluation and you want to avoid having
> another system in play then I'd just write a little custom processor in
> NiFi for your case.  Once you have a more complex data distribution and
> processing requirement and you want a powerful low latency combination I'd
> say put NiFi, Kafka, and Flink together for a pretty hard to beat combo.
>
> Thanks
>
> On Tue, Jun 4, 2019 at 9:51 AM Peter Wicks (pwicks) <[hidden email]>
> wrote:
>
>> Craig,
>>
>> If you have a timestamp set as an attribute on the processor, then this is
>> kind of possible.
>>
>> Have a regular MergeContent processor, with "Maximum Group Size" set to 1
>> mb, set "Max Bin Age" to 3 min; you may need to tweak settings to get the
>> right cadence, but these are generally the settings you need to touch. Use
>> the "Merged" relationship for whatever you need. To create the Window, pass
>> the "Original" relationship to a RouteOnAttribute processor.
>>
>> In the RouteOnAttribute use NiFi Expression Language to calculate how old
>> the FlowFile is (using the timestamp attribute I mentioned). If the
>> FlowFile is older than x, drop it, else send it back to the MergeContent
>> processor.
>>
>> Using this process, it should be easy to get a 5 min rolling window (drop
>> any FlowFile older than 5 min in RouteOnAttribute).
>>
>> I don't know that this perfectly answers what you asked, but does it give
>> you a good direction to investigate?
>>
>> Thanks,
>>  Peter
>>
>> -----Original Message-----
>> From: Craig Knell <[hidden email]>
>> Sent: Tuesday, June 4, 2019 1:32 AM
>> To: [hidden email]
>> Subject: [EXT] Sliding windows
>>
>> Hi Folks
>>
>> We have a stream of data that I need to window to 5 minutes and the window
>> is to slide every 3 minutes. Each minute is 1 mb, I therefore have to
>> deliver 5mb per 3 minutes.
>>
>> What is the best way of achieving this in nifi?
>>
>> Best regards
>>
>> Craig
>>
Reply | Threaded
Open this post in threaded view
|

Re: [EXT] Sliding windows

Michal Klempa
Hi Craig,
I was doing something similar couple of years ago. Built custom
processor and found out, that the problem is much bigger then I
thought.
In custom processor, the onTrigger is called from different threads,
in parallel. So you have to maintain synchronized or concurrent data
structures in the processor itself, if you want to achieve stateful
stream processing (my case was even simpler - just calculate 'price
change' between various events).
Of course, the processor became the bottleneck and the order of events
(FlowFiles) was not guaranteed by NiFi, files may be penalized in
previous processors, exchanging their order.

All in all, I ended up learning Apache Flink and connecting the Flink to NiFi.
You can try setup your Flink env:
https://training.ververica.com/devEnvSetup.html
And then connect it to NiFi:
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/nifi.html


You don't need Kafka at all (we are not using it in simple cases).
Best, Michal

On Wed, Jun 5, 2019 at 3:26 AM Craig Knell <[hidden email]> wrote:

>
> Joe
>
> Thanks for the options.
>
> Kafka Flink option is interesting but to big at the moment.
>
> I’ll build a custom processor for this occasion.
>
> Much appreciated!
>
> Craig Knell
>
>
> > On 4 Jun 2019, at 21:56, Joe Witt <[hidden email]> wrote:
> >
> > ...from the description it isn't clear what you're trying to achieve so
> > lets first try to expand the detail on the use case.
> >
> > We should distinguish whether you're wanting to 'combine various objects in
> > a data stream together on some time bound' from 'processing various objects
> > in a data stream to make some observation over some time bound'.
> >
> > If you're wanting to merge data together to make a larger object comprised
> > of those smaller objects then MergeContent is your friend.
> >
> > If you're wanting to look at a given stream or set of streams at once and
> > make a time window based observation over that data then I recommend
> > looking at something like Apache Flink which is purpose built for that and
> > should be better than NiFi at that part.  If it is a pretty straight
> > forward single stream window evaluation and you want to avoid having
> > another system in play then I'd just write a little custom processor in
> > NiFi for your case.  Once you have a more complex data distribution and
> > processing requirement and you want a powerful low latency combination I'd
> > say put NiFi, Kafka, and Flink together for a pretty hard to beat combo.
> >
> > Thanks
> >
> > On Tue, Jun 4, 2019 at 9:51 AM Peter Wicks (pwicks) <[hidden email]>
> > wrote:
> >
> >> Craig,
> >>
> >> If you have a timestamp set as an attribute on the processor, then this is
> >> kind of possible.
> >>
> >> Have a regular MergeContent processor, with "Maximum Group Size" set to 1
> >> mb, set "Max Bin Age" to 3 min; you may need to tweak settings to get the
> >> right cadence, but these are generally the settings you need to touch. Use
> >> the "Merged" relationship for whatever you need. To create the Window, pass
> >> the "Original" relationship to a RouteOnAttribute processor.
> >>
> >> In the RouteOnAttribute use NiFi Expression Language to calculate how old
> >> the FlowFile is (using the timestamp attribute I mentioned). If the
> >> FlowFile is older than x, drop it, else send it back to the MergeContent
> >> processor.
> >>
> >> Using this process, it should be easy to get a 5 min rolling window (drop
> >> any FlowFile older than 5 min in RouteOnAttribute).
> >>
> >> I don't know that this perfectly answers what you asked, but does it give
> >> you a good direction to investigate?
> >>
> >> Thanks,
> >>  Peter
> >>
> >> -----Original Message-----
> >> From: Craig Knell <[hidden email]>
> >> Sent: Tuesday, June 4, 2019 1:32 AM
> >> To: [hidden email]
> >> Subject: [EXT] Sliding windows
> >>
> >> Hi Folks
> >>
> >> We have a stream of data that I need to window to 5 minutes and the window
> >> is to slide every 3 minutes. Each minute is 1 mb, I therefore have to
> >> deliver 5mb per 3 minutes.
> >>
> >> What is the best way of achieving this in nifi?
> >>
> >> Best regards
> >>
> >> Craig
> >>