End of stream?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

End of stream?

Matt Burgess
Does NiFi have the concept of an "end of stream" or is it designed to pretty
much always be running? For example if I use a GetFile processor pointing at
a single directory (with remove files = true), once all the files have been
processed, can downstream processors know that?

I'm working on a ReservoirSampling processor, and I have it successfully
building the reservoir from all incoming FlowFiles. However it never gets to
the logic that sends the sampled FlowFiles to the downstream processor (just
a PutFile at this point). I have the logic in a block like:

FlowFile flowFile = session.get();
if(flowFile == null) {
  // send reservoir
}
else {
 // build reservoir
}

But the if-clause never gets entered.  Is there a different approach and/or
am I misunderstanding how the data flow works?

Thanks in advance,
Matt


Reply | Threaded
Open this post in threaded view
|

Re: End of stream?

Joe Witt
Matt,

For processors in the middle of the flow the null check is important
for race conditions where it is told it can run but by the time it
does there are no flowfiles left.  The framework though in general
will avoid this because it is checking if there is work to do.  So, in
short you can't use that mechanism to know there are no items left to
process.

The only way to know that a given flowfile was the last in a bunch
would be for that fact to be an attribute on a given flow file.

There is really no concept of an end of stream so to speak from a
processor perspective.  Processors are either running on not running.
You can, as i mentioned before though, use attributes of flowfiles to
annotate their relative position in a stream.

Does that help explain it at all or did I make it more confusing?

Thanks
Joe

On Fri, Nov 6, 2015 at 11:32 AM, Matthew Burgess <[hidden email]> wrote:

> Does NiFi have the concept of an "end of stream" or is it designed to pretty
> much always be running? For example if I use a GetFile processor pointing at
> a single directory (with remove files = true), once all the files have been
> processed, can downstream processors know that?
>
> I'm working on a ReservoirSampling processor, and I have it successfully
> building the reservoir from all incoming FlowFiles. However it never gets to
> the logic that sends the sampled FlowFiles to the downstream processor (just
> a PutFile at this point). I have the logic in a block like:
>
> FlowFile flowFile = session.get();
> if(flowFile == null) {
>   // send reservoir
> }
> else {
>  // build reservoir
> }
>
> But the if-clause never gets entered.  Is there a different approach and/or
> am I misunderstanding how the data flow works?
>
> Thanks in advance,
> Matt
>
>
Reply | Threaded
Open this post in threaded view
|

Re: End of stream?

Matt Burgess
No that makes sense, thanks much!

So for my case, I'm thinking I'd want another attribute from GetFile called
"lastInStream" or something? It would be set once processing of the current
directory is complete (for the time being), and reset each time the
onTrigger is called.  At that point it's really more of a "lastInBatch", so
maybe instead I could use the batch size somehow as a hint to the
ReservoirSampling processor that the current reservoir is ready to send
along?  The use case is a kind of burst processing (or per-batch filtering),
where FlowFiles are available in "groups", where I could sample from the
incoming group with equal probability to give a smaller output group.


From:  Joe Witt <[hidden email]>
Reply-To:  <[hidden email]>
Date:  Friday, November 6, 2015 at 11:38 AM
To:  <[hidden email]>
Subject:  Re: End of stream?

Matt,

For processors in the middle of the flow the null check is important
for race conditions where it is told it can run but by the time it
does there are no flowfiles left.  The framework though in general
will avoid this because it is checking if there is work to do.  So, in
short you can't use that mechanism to know there are no items left to
process.

The only way to know that a given flowfile was the last in a bunch
would be for that fact to be an attribute on a given flow file.

There is really no concept of an end of stream so to speak from a
processor perspective.  Processors are either running on not running.
You can, as i mentioned before though, use attributes of flowfiles to
annotate their relative position in a stream.

Does that help explain it at all or did I make it more confusing?

Thanks
Joe

On Fri, Nov 6, 2015 at 11:32 AM, Matthew Burgess <[hidden email]>
wrote:

>  Does NiFi have the concept of an "end of stream" or is it designed to pretty
>  much always be running? For example if I use a GetFile processor pointing at
>  a single directory (with remove files = true), once all the files have been
>  processed, can downstream processors know that?
>
>  I'm working on a ReservoirSampling processor, and I have it successfully
>  building the reservoir from all incoming FlowFiles. However it never gets to
>  the logic that sends the sampled FlowFiles to the downstream processor (just
>  a PutFile at this point). I have the logic in a block like:
>
>  FlowFile flowFile = session.get();
>  if(flowFile == null) {
>    // send reservoir
>  }
>  else {
>   // build reservoir
>  }
>
>  But the if-clause never gets entered.  Is there a different approach and/or
>  am I misunderstanding how the data flow works?
>
>  Thanks in advance,
>  Matt
>
>



Reply | Threaded
Open this post in threaded view
|

Re: End of stream?

Michael Moser
Matt,

There is the MonitorActivity processor, which "Monitors the flow for
activity and sends out an indicator when the flow has not had any data for
some specified amount of time and again when the flow's activity is
restored".  You could look at how MonitorActivity is coded to get ideas for
how your ReservoirSampling processor can do what you need.

-- Mike


On Fri, Nov 6, 2015 at 11:49 AM, Matthew Burgess <[hidden email]>
wrote:

> No that makes sense, thanks much!
>
> So for my case, I'm thinking I'd want another attribute from GetFile called
> "lastInStream" or something? It would be set once processing of the current
> directory is complete (for the time being), and reset each time the
> onTrigger is called.  At that point it's really more of a "lastInBatch", so
> maybe instead I could use the batch size somehow as a hint to the
> ReservoirSampling processor that the current reservoir is ready to send
> along?  The use case is a kind of burst processing (or per-batch
> filtering),
> where FlowFiles are available in "groups", where I could sample from the
> incoming group with equal probability to give a smaller output group.
>
>
> From:  Joe Witt <[hidden email]>
> Reply-To:  <[hidden email]>
> Date:  Friday, November 6, 2015 at 11:38 AM
> To:  <[hidden email]>
> Subject:  Re: End of stream?
>
> Matt,
>
> For processors in the middle of the flow the null check is important
> for race conditions where it is told it can run but by the time it
> does there are no flowfiles left.  The framework though in general
> will avoid this because it is checking if there is work to do.  So, in
> short you can't use that mechanism to know there are no items left to
> process.
>
> The only way to know that a given flowfile was the last in a bunch
> would be for that fact to be an attribute on a given flow file.
>
> There is really no concept of an end of stream so to speak from a
> processor perspective.  Processors are either running on not running.
> You can, as i mentioned before though, use attributes of flowfiles to
> annotate their relative position in a stream.
>
> Does that help explain it at all or did I make it more confusing?
>
> Thanks
> Joe
>
> On Fri, Nov 6, 2015 at 11:32 AM, Matthew Burgess <[hidden email]>
> wrote:
> >  Does NiFi have the concept of an "end of stream" or is it designed to
> pretty
> >  much always be running? For example if I use a GetFile processor
> pointing at
> >  a single directory (with remove files = true), once all the files have
> been
> >  processed, can downstream processors know that?
> >
> >  I'm working on a ReservoirSampling processor, and I have it successfully
> >  building the reservoir from all incoming FlowFiles. However it never
> gets to
> >  the logic that sends the sampled FlowFiles to the downstream processor
> (just
> >  a PutFile at this point). I have the logic in a block like:
> >
> >  FlowFile flowFile = session.get();
> >  if(flowFile == null) {
> >    // send reservoir
> >  }
> >  else {
> >   // build reservoir
> >  }
> >
> >  But the if-clause never gets entered.  Is there a different approach
> and/or
> >  am I misunderstanding how the data flow works?
> >
> >  Thanks in advance,
> >  Matt
> >
> >
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: End of stream?

Joe Witt
Also meant to reply back on this earlier...

It would be a reasonable JIRA to add logic into GetFile to add an
attribute to GetFile to signal that a given flow file was sourced by
the 'last file left' in a given directory or source.  However, it is
somewhat odd in that when is something considered the last?  Also of
note here is that data could be prioritized post GetFile and then
you'd really not know if you're dealing with the last one or the first
one or anything in between.  We'd really need GetFile to put a
timestamp and sequence id on or something.  Hmmm.

Given what you're trying to do could instead this logic of sample
groups around some time interval simply be part of that processor?

Thanks
Joe

On Fri, Nov 6, 2015 at 9:48 PM, Michael Moser <[hidden email]> wrote:

> Matt,
>
> There is the MonitorActivity processor, which "Monitors the flow for
> activity and sends out an indicator when the flow has not had any data for
> some specified amount of time and again when the flow's activity is
> restored".  You could look at how MonitorActivity is coded to get ideas for
> how your ReservoirSampling processor can do what you need.
>
> -- Mike
>
>
> On Fri, Nov 6, 2015 at 11:49 AM, Matthew Burgess <[hidden email]>
> wrote:
>
>> No that makes sense, thanks much!
>>
>> So for my case, I'm thinking I'd want another attribute from GetFile called
>> "lastInStream" or something? It would be set once processing of the current
>> directory is complete (for the time being), and reset each time the
>> onTrigger is called.  At that point it's really more of a "lastInBatch", so
>> maybe instead I could use the batch size somehow as a hint to the
>> ReservoirSampling processor that the current reservoir is ready to send
>> along?  The use case is a kind of burst processing (or per-batch
>> filtering),
>> where FlowFiles are available in "groups", where I could sample from the
>> incoming group with equal probability to give a smaller output group.
>>
>>
>> From:  Joe Witt <[hidden email]>
>> Reply-To:  <[hidden email]>
>> Date:  Friday, November 6, 2015 at 11:38 AM
>> To:  <[hidden email]>
>> Subject:  Re: End of stream?
>>
>> Matt,
>>
>> For processors in the middle of the flow the null check is important
>> for race conditions where it is told it can run but by the time it
>> does there are no flowfiles left.  The framework though in general
>> will avoid this because it is checking if there is work to do.  So, in
>> short you can't use that mechanism to know there are no items left to
>> process.
>>
>> The only way to know that a given flowfile was the last in a bunch
>> would be for that fact to be an attribute on a given flow file.
>>
>> There is really no concept of an end of stream so to speak from a
>> processor perspective.  Processors are either running on not running.
>> You can, as i mentioned before though, use attributes of flowfiles to
>> annotate their relative position in a stream.
>>
>> Does that help explain it at all or did I make it more confusing?
>>
>> Thanks
>> Joe
>>
>> On Fri, Nov 6, 2015 at 11:32 AM, Matthew Burgess <[hidden email]>
>> wrote:
>> >  Does NiFi have the concept of an "end of stream" or is it designed to
>> pretty
>> >  much always be running? For example if I use a GetFile processor
>> pointing at
>> >  a single directory (with remove files = true), once all the files have
>> been
>> >  processed, can downstream processors know that?
>> >
>> >  I'm working on a ReservoirSampling processor, and I have it successfully
>> >  building the reservoir from all incoming FlowFiles. However it never
>> gets to
>> >  the logic that sends the sampled FlowFiles to the downstream processor
>> (just
>> >  a PutFile at this point). I have the logic in a block like:
>> >
>> >  FlowFile flowFile = session.get();
>> >  if(flowFile == null) {
>> >    // send reservoir
>> >  }
>> >  else {
>> >   // build reservoir
>> >  }
>> >
>> >  But the if-clause never gets entered.  Is there a different approach
>> and/or
>> >  am I misunderstanding how the data flow works?
>> >
>> >  Thanks in advance,
>> >  Matt
>> >
>> >
>>
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: End of stream?

Matt Burgess
In reply to this post by Michael Moser
Sounds very promising, thank you!! I'll share what I find out :)

Are there other group-related use cases? Maybe some non-incremental statistical measures?

Regards,
Matt

Sent from my iPhone

> On Nov 6, 2015, at 9:48 PM, Michael Moser <[hidden email]> wrote:
>
> Matt,
>
> There is the MonitorActivity processor, which "Monitors the flow for
> activity and sends out an indicator when the flow has not had any data for
> some specified amount of time and again when the flow's activity is
> restored".  You could look at how MonitorActivity is coded to get ideas for
> how your ReservoirSampling processor can do what you need.
>
> -- Mike
>
>
> On Fri, Nov 6, 2015 at 11:49 AM, Matthew Burgess <[hidden email]>
> wrote:
>
>> No that makes sense, thanks much!
>>
>> So for my case, I'm thinking I'd want another attribute from GetFile called
>> "lastInStream" or something? It would be set once processing of the current
>> directory is complete (for the time being), and reset each time the
>> onTrigger is called.  At that point it's really more of a "lastInBatch", so
>> maybe instead I could use the batch size somehow as a hint to the
>> ReservoirSampling processor that the current reservoir is ready to send
>> along?  The use case is a kind of burst processing (or per-batch
>> filtering),
>> where FlowFiles are available in "groups", where I could sample from the
>> incoming group with equal probability to give a smaller output group.
>>
>>
>> From:  Joe Witt <[hidden email]>
>> Reply-To:  <[hidden email]>
>> Date:  Friday, November 6, 2015 at 11:38 AM
>> To:  <[hidden email]>
>> Subject:  Re: End of stream?
>>
>> Matt,
>>
>> For processors in the middle of the flow the null check is important
>> for race conditions where it is told it can run but by the time it
>> does there are no flowfiles left.  The framework though in general
>> will avoid this because it is checking if there is work to do.  So, in
>> short you can't use that mechanism to know there are no items left to
>> process.
>>
>> The only way to know that a given flowfile was the last in a bunch
>> would be for that fact to be an attribute on a given flow file.
>>
>> There is really no concept of an end of stream so to speak from a
>> processor perspective.  Processors are either running on not running.
>> You can, as i mentioned before though, use attributes of flowfiles to
>> annotate their relative position in a stream.
>>
>> Does that help explain it at all or did I make it more confusing?
>>
>> Thanks
>> Joe
>>
>> On Fri, Nov 6, 2015 at 11:32 AM, Matthew Burgess <[hidden email]>
>> wrote:
>>> Does NiFi have the concept of an "end of stream" or is it designed to
>> pretty
>>> much always be running? For example if I use a GetFile processor
>> pointing at
>>> a single directory (with remove files = true), once all the files have
>> been
>>> processed, can downstream processors know that?
>>>
>>> I'm working on a ReservoirSampling processor, and I have it successfully
>>> building the reservoir from all incoming FlowFiles. However it never
>> gets to
>>> the logic that sends the sampled FlowFiles to the downstream processor
>> (just
>>> a PutFile at this point). I have the logic in a block like:
>>>
>>> FlowFile flowFile = session.get();
>>> if(flowFile == null) {
>>>   // send reservoir
>>> }
>>> else {
>>>  // build reservoir
>>> }
>>>
>>> But the if-clause never gets entered.  Is there a different approach
>> and/or
>>> am I misunderstanding how the data flow works?
>>>
>>> Thanks in advance,
>>> Matt
>>
>>
>>
>>