Reading all flowfiles queued for a processor (>20000 flowfiles)

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading all flowfiles queued for a processor (>20000 flowfiles)

TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1
Hi NiFi devs,

My understanding is that when I create a custom processor, and get FlowFiles with session.get(CustomFlowFileFilter), irrespective of how many times the CustomFlowFileFilter returns FlowFileFilterResult.ACCEPT_AND_CONTINUE or FlowFileFilterResult.REJECT_AND_CONTINUE, it will only ever loop through at most 20000 flowfiles, where 20000 is defined by the nifi.queue.swap.threshold setting in nifi.properties.
(Disregarding that it's actually 19999, and that setting is not respected when running tests, which made this SUPER confusing to debug...)

Attached a screenshot of that happening (also at: https://i.imgur.com/25QJxuj.png)

My question is, Is there a way to force a custom processor to be able to read ALL queued flowfiles in all incoming connections?

My particular use case is pairing flowfiles, and while there probably are other ways to pair files using Wait/Notify processors, I'm handling files in large throughput, with possible delays between the pairs arriving, and it's quite easy to hit the limit. I could also increase the swap threshold setting, but I keep hitting the problem. I've also played with custom prioritizers on connections in an attempt to maximise the chance of having pairs occur, but because I need to move unmatched flowfiles out, and back in, is essentially creating a busy loop. Seems like there should be a better way.

Any ideas?

Ideally, a way to force a custom processor to be able to read all queued flowfiles (swapping more than the threshold into memory, during a single OnTrigger call) would be the easiest solution. Is there one?

Cheers,
Sam



Reply | Threaded
Open this post in threaded view
|

Re: Reading all flowfiles queued for a processor (>20000 flowfiles)

Mark Payne
Hi Sam,

There are a couple of ways to tackle this problem. My recommendation would be to look at extending the BinFiles processor.
This is an abstract class, which MergeContent extends (and I think 1 or 2 other processors?). Its job is to bin 'like flowfiles' together,
and it can take care of pulling data from queues and efficiently binning the FlowFiles together. It is important, though, to keep in mind
that FlowFiles contain attribute maps, and those can quickly exhaust your heap when you're trying to hold 10's or 100's of thousands
of FlowFiles in a single processor.

Thanks
-Mark


On Aug 22, 2018, at 8:07 AM, TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1 <[hidden email]<mailto:[hidden email]>> wrote:

Hi NiFi devs,

My understanding is that when I create a custom processor, and get FlowFiles with session.get(CustomFlowFileFilter), irrespective of how many times the CustomFlowFileFilter returns FlowFileFilterResult.ACCEPT_AND_CONTINUE or FlowFileFilterResult.REJECT_AND_CONTINUE, it will only ever loop through at most 20000 flowfiles, where 20000 is defined by the nifi.queue.swap.threshold setting in nifi.properties.
(Disregarding that it's actually 19999, and that setting is not respected when running tests, which made this SUPER confusing to debug...)

Attached a screenshot of that happening (also at: https://i.imgur.com/25QJxuj.png)

My question is, Is there a way to force a custom processor to be able to read ALL queued flowfiles in all incoming connections?

My particular use case is pairing flowfiles, and while there probably are other ways to pair files using Wait/Notify processors, I'm handling files in large throughput, with possible delays between the pairs arriving, and it's quite easy to hit the limit. I could also increase the swap threshold setting, but I keep hitting the problem. I've also played with custom prioritizers on connections in an attempt to maximise the chance of having pairs occur, but because I need to move unmatched flowfiles out, and back in, is essentially creating a busy loop. Seems like there should be a better way.

Any ideas?

Ideally, a way to force a custom processor to be able to read all queued flowfiles (swapping more than the threshold into memory, during a single OnTrigger call) would be the easiest solution. Is there one?

Cheers,
Sam




Reply | Threaded
Open this post in threaded view
|

Re: Reading all flowfiles queued for a processor (>20000 flowfiles)

TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1
In reply to this post by TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1
Neat! So the BinFiles processor doesn't quite have the control over the
binning/pairing that I wanted, but it got me on the right track. I think
I've got a pretty lightweight custom processor that does what I'm looking
for now.

Made a processor which extends AbstractSessionFactoryProcessor that has an
AtomicReference<ProcessSession> that gets initialized on its first
onTrigger, and a HashMap. Now, each OnTrigger creates a session as normal,
and migrates flowfiles to the ProcessSession the processor holds onto. It
then does the matching, migrates flowfiles back to the 'current'
ProcessSession, which then combines them, and transfers them on their merry
way. There's the potential for blowing the heap now, but I think I can put
some controls in place to manage that.

Just sanity checking - is this a sane way to do this - moving things
between the 'current session', and a 'held session'? It looks like that's
pretty much how the MergeContent works; by having a session for each Bin.
So instead of having N bins/sessions, I've just got +1 Session that holds
onto the FlowFiles that I've keyed/seen?

Thanks Mark!

Cheers,
Sam

On 2018/08/22 13:54:59, Mark Payne <[hidden email]> wrote:
> Hi Sam,>
>
> There are a couple of ways to tackle this problem. My recommendation
would be to look at extending the BinFiles processor.>
> This is an abstract class, which MergeContent extends (and I think 1 or 2
other processors?). Its job is to bin 'like flowfiles' together,>
> and it can take care of pulling data from queues and efficiently binning
the FlowFiles together. It is important, though, to keep in mind>
> that FlowFiles contain attribute maps, and those can quickly exhaust your
heap when you're trying to hold 10's or 100's of thousands>
> of FlowFiles in a single processor.>
>
> Thanks>
> -Mark>
>
>
> On Aug 22, 2018, at 8:07 AM, TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1 <
[hidden email]>> wrote:>
>
> Hi NiFi devs,>
>
> My understanding is that when I create a custom processor, and get
FlowFiles with session.get(CustomFlowFileFilter), irrespective of how many
times the CustomFlowFileFilter returns
FlowFileFilterResult.ACCEPT_AND_CONTINUE or
FlowFileFilterResult.REJECT_AND_CONTINUE, it will only ever loop through at
most 20000 flowfiles, where 20000 is defined by the
nifi.queue.swap.threshold setting in nifi.properties.>
> (Disregarding that it's actually 19999, and that setting is not respected
when running tests, which made this SUPER confusing to debug...)>
>
> Attached a screenshot of that happening (also at:
https://i.imgur.com/25QJxuj.png)>
>
> My question is, Is there a way to force a custom processor to be able to
read ALL queued flowfiles in all incoming connections?>
>
> My particular use case is pairing flowfiles, and while there probably are
other ways to pair files using Wait/Notify processors, I'm handling files
in large throughput, with possible delays between the pairs arriving, and
it's quite easy to hit the limit. I could also increase the swap threshold
setting, but I keep hitting the problem. I've also played with custom
prioritizers on connections in an attempt to maximise the chance of having
pairs occur, but because I need to move unmatched flowfiles out, and back
in, is essentially creating a busy loop. Seems like there should be a
better way.>
>
> Any ideas?>
>
> Ideally, a way to force a custom processor to be able to read all queued
flowfiles (swapping more than the threshold into memory, during a single
OnTrigger call) would be the easiest solution. Is there one?>
>
> Cheers,>
> Sam>
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Reading all flowfiles queued for a processor (>20000 flowfiles)

Mark Payne
Sam,

Yes, that's right. The session provides a transaction. So each "bin" has its own session. This way,
once a Bin is ready to be combined, you can do so in a single transaction/session and then the session
is complete. No need to try to manage which bins contain which sessions or vice versa.

Thanks
-Mark



> On Aug 23, 2018, at 4:31 AM, TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1 <[hidden email]> wrote:
>
> Neat! So the BinFiles processor doesn't quite have the control over the
> binning/pairing that I wanted, but it got me on the right track. I think
> I've got a pretty lightweight custom processor that does what I'm looking
> for now.
>
> Made a processor which extends AbstractSessionFactoryProcessor that has an
> AtomicReference<ProcessSession> that gets initialized on its first
> onTrigger, and a HashMap. Now, each OnTrigger creates a session as normal,
> and migrates flowfiles to the ProcessSession the processor holds onto. It
> then does the matching, migrates flowfiles back to the 'current'
> ProcessSession, which then combines them, and transfers them on their merry
> way. There's the potential for blowing the heap now, but I think I can put
> some controls in place to manage that.
>
> Just sanity checking - is this a sane way to do this - moving things
> between the 'current session', and a 'held session'? It looks like that's
> pretty much how the MergeContent works; by having a session for each Bin.
> So instead of having N bins/sessions, I've just got +1 Session that holds
> onto the FlowFiles that I've keyed/seen?
>
> Thanks Mark!
>
> Cheers,
> Sam
>
> On 2018/08/22 13:54:59, Mark Payne <[hidden email]> wrote:
>> Hi Sam,>
>>
>> There are a couple of ways to tackle this problem. My recommendation
> would be to look at extending the BinFiles processor.>
>> This is an abstract class, which MergeContent extends (and I think 1 or 2
> other processors?). Its job is to bin 'like flowfiles' together,>
>> and it can take care of pulling data from queues and efficiently binning
> the FlowFiles together. It is important, though, to keep in mind>
>> that FlowFiles contain attribute maps, and those can quickly exhaust your
> heap when you're trying to hold 10's or 100's of thousands>
>> of FlowFiles in a single processor.>
>>
>> Thanks>
>> -Mark>
>>
>>
>> On Aug 22, 2018, at 8:07 AM, TLdZPYSamI4WHPl1 TLdZPYSamI4WHPl1 <
> [hidden email]>> wrote:>
>>
>> Hi NiFi devs,>
>>
>> My understanding is that when I create a custom processor, and get
> FlowFiles with session.get(CustomFlowFileFilter), irrespective of how many
> times the CustomFlowFileFilter returns
> FlowFileFilterResult.ACCEPT_AND_CONTINUE or
> FlowFileFilterResult.REJECT_AND_CONTINUE, it will only ever loop through at
> most 20000 flowfiles, where 20000 is defined by the
> nifi.queue.swap.threshold setting in nifi.properties.>
>> (Disregarding that it's actually 19999, and that setting is not respected
> when running tests, which made this SUPER confusing to debug...)>
>>
>> Attached a screenshot of that happening (also at:
> https://i.imgur.com/25QJxuj.png)>
>>
>> My question is, Is there a way to force a custom processor to be able to
> read ALL queued flowfiles in all incoming connections?>
>>
>> My particular use case is pairing flowfiles, and while there probably are
> other ways to pair files using Wait/Notify processors, I'm handling files
> in large throughput, with possible delays between the pairs arriving, and
> it's quite easy to hit the limit. I could also increase the swap threshold
> setting, but I keep hitting the problem. I've also played with custom
> prioritizers on connections in an attempt to maximise the chance of having
> pairs occur, but because I need to move unmatched flowfiles out, and back
> in, is essentially creating a busy loop. Seems like there should be a
> better way.>
>>
>> Any ideas?>
>>
>> Ideally, a way to force a custom processor to be able to read all queued
> flowfiles (swapping more than the threshold into memory, during a single
> OnTrigger call) would be the easiest solution. Is there one?>
>>
>> Cheers,>
>> Sam>
>>
>>
>>
>>
>>