Batching Questions

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Batching Questions

Edgardo Vega
I am creating a custom processor and was attempting to support batching.
From reading the code for the standard nifi processor it seems like all I
have to do is:

final List<FlowFile> flowFileBatch = session.get(50);

Is this correct?

The documentation states "If this annotation is present, the user will be
able to choose whether they prefer high throughput or lower latency in the
Processor’s Scheduling tab. " I pulled in the EvaluateXQuery processor and
I do not see where that is set. Is that feature not currently enabled?

Also if I decided not to all batching in the scheduling tab what would
happen in the processor when i do session.get(50), will it really only get
1 flow file or would it always get 50?

Thanks for the help ahead of time.
Reply | Threaded
Open this post in threaded view
|

Re: Batching Questions

Mark Payne
Edgardo,

There really are 2 ways to do batching: calling session.get(50) and
using the @SupportsBatching annotation.

I realize that it seems weird to have 2 different mechanisms, but each
provides advantages/tradeoffs.

In general you should now just use the @SupportsBatching annotation.

When that annotation is present, what happens is that the framework will
handle all of the batching for you. So
you can just do "FlowFile flowFile = session.get();" and not even worry
about batching at all. This means that the user
is also able to choose how much batching should occur.

EvaluateXQuery uses "session.get(50)" for a very specific reason: it has
to compile the XQuery each time that onTrigger
is called (because the compiled object is not thread-safe). As a result,
it wants to pull a bunch of FlowFiles (up to 50) and
process them all. If it just used @SupportsBatching it would end up
having to recompile the XQuery for every single FlowFile,
which would be very inefficient.

Does this answer your question?

If something doesn't make sense or if you have any further questions,
I'm happy to help!

Thanks
-Mark


------ Original Message ------
From: "Edgardo Vega" <[hidden email]>
To: [hidden email]
Sent: 4/21/2015 11:32:44 AM
Subject: Batching Questions

>I am creating a custom processor and was attempting to support
>batching.
>From reading the code for the standard nifi processor it seems like all
>I
>have to do is:
>
>final List<FlowFile> flowFileBatch = session.get(50);
>
>Is this correct?
>
>The documentation states "If this annotation is present, the user will
>be
>able to choose whether they prefer high throughput or lower latency in
>the
>Processor’s Scheduling tab. " I pulled in the EvaluateXQuery processor
>and
>I do not see where that is set. Is that feature not currently enabled?
>
>Also if I decided not to all batching in the scheduling tab what would
>happen in the processor when i do session.get(50), will it really only
>get
>1 flow file or would it always get 50?
>
>Thanks for the help ahead of time.
Reply | Threaded
Open this post in threaded view
|

Re: Batching Questions

Ryan Blue
Thanks for explaining, Mark. I found this really helpful.

For EvaluateXQuery, could we use a ThreadLocal to ensure the XQuery is
only used by a single thread, rather than implementing a batch mechanism
that can't be tuned by the user?

rb

On 04/21/2015 08:48 AM, Mark Payne wrote:

> Edgardo,
>
> There really are 2 ways to do batching: calling session.get(50) and
> using the @SupportsBatching annotation.
>
> I realize that it seems weird to have 2 different mechanisms, but each
> provides advantages/tradeoffs.
>
> In general you should now just use the @SupportsBatching annotation.
>
> When that annotation is present, what happens is that the framework will
> handle all of the batching for you. So
> you can just do "FlowFile flowFile = session.get();" and not even worry
> about batching at all. This means that the user
> is also able to choose how much batching should occur.
>
> EvaluateXQuery uses "session.get(50)" for a very specific reason: it has
> to compile the XQuery each time that onTrigger
> is called (because the compiled object is not thread-safe). As a result,
> it wants to pull a bunch of FlowFiles (up to 50) and
> process them all. If it just used @SupportsBatching it would end up
> having to recompile the XQuery for every single FlowFile,
> which would be very inefficient.
>
> Does this answer your question?
>
> If something doesn't make sense or if you have any further questions,
> I'm happy to help!
>
> Thanks
> -Mark
>
>
> ------ Original Message ------
> From: "Edgardo Vega" <[hidden email]>
> To: [hidden email]
> Sent: 4/21/2015 11:32:44 AM
> Subject: Batching Questions
>
>> I am creating a custom processor and was attempting to support batching.
>> From reading the code for the standard nifi processor it seems like all I
>> have to do is:
>>
>> final List<FlowFile> flowFileBatch = session.get(50);
>>
>> Is this correct?
>>
>> The documentation states "If this annotation is present, the user will be
>> able to choose whether they prefer high throughput or lower latency in
>> the
>> Processor’s Scheduling tab. " I pulled in the EvaluateXQuery processor
>> and
>> I do not see where that is set. Is that feature not currently enabled?
>>
>> Also if I decided not to all batching in the scheduling tab what would
>> happen in the processor when i do session.get(50), will it really only
>> get
>> 1 flow file or would it always get 50?
>>
>> Thanks for the help ahead of time.


--
Ryan Blue
Software Engineer
Cloudera, Inc.
Reply | Threaded
Open this post in threaded view
|

Re: Batching Questions

Mark Payne
Ryan,

ThreadLocal can get really problematic, because you don't tie threads to
a processor. There's a big thread pool that may have a couple hundred
threads in it, and you'd have one of these for each thread. The memory
footprint of a compiled XQuery is probably not terribly bad, though.

Along the same lines, you could use an ObjectPool to store the objects
and then synchronize on the object while using it. That would avoid the
potentially very many ThreadLocal objects.


------ Original Message ------
From: "Ryan Blue" <[hidden email]>
To: [hidden email]
Sent: 4/21/2015 1:09:57 PM
Subject: Re: Batching Questions

>Thanks for explaining, Mark. I found this really helpful.
>
>For EvaluateXQuery, could we use a ThreadLocal to ensure the XQuery is
>only used by a single thread, rather than implementing a batch
>mechanism that can't be tuned by the user?
>
>rb
>
>On 04/21/2015 08:48 AM, Mark Payne wrote:
>>Edgardo,
>>
>>There really are 2 ways to do batching: calling session.get(50) and
>>using the @SupportsBatching annotation.
>>
>>I realize that it seems weird to have 2 different mechanisms, but each
>>provides advantages/tradeoffs.
>>
>>In general you should now just use the @SupportsBatching annotation.
>>
>>When that annotation is present, what happens is that the framework
>>will
>>handle all of the batching for you. So
>>you can just do "FlowFile flowFile = session.get();" and not even
>>worry
>>about batching at all. This means that the user
>>is also able to choose how much batching should occur.
>>
>>EvaluateXQuery uses "session.get(50)" for a very specific reason: it
>>has
>>to compile the XQuery each time that onTrigger
>>is called (because the compiled object is not thread-safe). As a
>>result,
>>it wants to pull a bunch of FlowFiles (up to 50) and
>>process them all. If it just used @SupportsBatching it would end up
>>having to recompile the XQuery for every single FlowFile,
>>which would be very inefficient.
>>
>>Does this answer your question?
>>
>>If something doesn't make sense or if you have any further questions,
>>I'm happy to help!
>>
>>Thanks
>>-Mark
>>
>>
>>------ Original Message ------
>>From: "Edgardo Vega" <[hidden email]>
>>To: [hidden email]
>>Sent: 4/21/2015 11:32:44 AM
>>Subject: Batching Questions
>>
>>>I am creating a custom processor and was attempting to support
>>>batching.
>>>From reading the code for the standard nifi processor it seems like
>>>all I
>>>have to do is:
>>>
>>>final List<FlowFile> flowFileBatch = session.get(50);
>>>
>>>Is this correct?
>>>
>>>The documentation states "If this annotation is present, the user
>>>will be
>>>able to choose whether they prefer high throughput or lower latency
>>>in
>>>the
>>>Processor’s Scheduling tab. " I pulled in the EvaluateXQuery
>>>processor
>>>and
>>>I do not see where that is set. Is that feature not currently
>>>enabled?
>>>
>>>Also if I decided not to all batching in the scheduling tab what
>>>would
>>>happen in the processor when i do session.get(50), will it really
>>>only
>>>get
>>>1 flow file or would it always get 50?
>>>
>>>Thanks for the help ahead of time.
>
>
>-- Ryan Blue
>Software Engineer
>Cloudera, Inc.
Reply | Threaded
Open this post in threaded view
|

Re: Batching Questions

Edgardo Vega
In reply to this post by Mark Payne
Mark,

So if the object was thread safe you would initialize it in a @OnScheduled method then use the @SupportsBatching on the class and be done?

Cheers,

Edgardo
Reply | Threaded
Open this post in threaded view
|

Re: Batching Questions

Mark Payne
Edgardo,

Yup, that's it! Then it's entirely up to the user whether or not (and to
what extend) batching occurs.

Thanks
-Mark

------ Original Message ------
From: "vegaed" <[hidden email]>
To: [hidden email]
Sent: 4/21/2015 3:35:33 PM
Subject: Re: Batching Questions

>Mark,
>
>So if the object was thread safe you would initialize it in a
>@OnScheduled
>method then use the @SupportsBatching on the class and be done?
>
>Cheers,
>
>Edgardo
>
>
>
>--
>View this message in context:
>http://apache-nifi-incubating-developer-list.39713.n7.nabble.com/Batching-Questions-tp1196p1201.html
>Sent from the Apache NiFi (incubating) Developer List mailing list
>archive at Nabble.com.