Batch Processors

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Batch Processors

Mike Drob-2
NiFi experts,

Let's say that I want to send data from NiFi to some destination that works
much better when the documents are batched. I do not think this is an
unreasonable ask.

I imagine that I would want to first combine all of the records in one
processor, and then pass on to a dedicated processor for sending the data?
I'm not sure yet if I would be able to use existing processors for this, or
if I could create my own, but this part feels fairly straightforward.

Next, let's imagine that some document in the batch causes it to fail. I
would like to un-batch, and create smaller batches, and try to send those,
assuming that some piece of the data was malformed and not a transient
error like network unavailable. Is this pattern workable? I can imagine
several layers of fail/split/retry to winnow from 1000 documents to 100 to
10 to 1, so that I can still get most of my data sent and know exactly
which documents fail.

I'm largely thinking out loud here, somebody stop me if I'm off the deep
end, or if this has been done before and we have examples (I didn't see any
readily apparent).

Mike
Reply | Threaded
Open this post in threaded view
|

Re: Batch Processors

Joe Witt
Mike,

This is extremely common.  Both sides of this are.  You have some
low-latency or batch producer and you want to delivery to some low
latency of batch receiver.

This is what splitting is for (in the case of going large to small) or
joining is for (in the case of 'batching').  MergeContent is designed
for the batching/aggregation case.  It allows you to merge using a
couple strategies with binary concatenation being the most common.

The very classic example is receiving a live stream of data which
needs to be sent to HDFS.  We'd setup MergeContent to aggregate data
to a size that is close to or matches the desired HDFS block size.

Now the part this is interesting that you mention is what if 'object'
45 of 100 causes a problem with the downstream system.  How
would/could NiFi know about that object?  Is it not feasible to
evaluate the data for its fitness to merge prior to doing so?

Anyway - let us know what you're thinking in terms of how NiFi would
know which object was problematic or that any were problematic for
that matter.

Thanks
Joe

On Tue, Feb 24, 2015 at 9:28 AM, Mike Drob <[hidden email]> wrote:

> NiFi experts,
>
> Let's say that I want to send data from NiFi to some destination that works
> much better when the documents are batched. I do not think this is an
> unreasonable ask.
>
> I imagine that I would want to first combine all of the records in one
> processor, and then pass on to a dedicated processor for sending the data?
> I'm not sure yet if I would be able to use existing processors for this, or
> if I could create my own, but this part feels fairly straightforward.
>
> Next, let's imagine that some document in the batch causes it to fail. I
> would like to un-batch, and create smaller batches, and try to send those,
> assuming that some piece of the data was malformed and not a transient
> error like network unavailable. Is this pattern workable? I can imagine
> several layers of fail/split/retry to winnow from 1000 documents to 100 to
> 10 to 1, so that I can still get most of my data sent and know exactly
> which documents fail.
>
> I'm largely thinking out loud here, somebody stop me if I'm off the deep
> end, or if this has been done before and we have examples (I didn't see any
> readily apparent).
>
> Mike
Reply | Threaded
Open this post in threaded view
|

Re: Batch Processors

Mark Payne
In reply to this post by Mike Drob-2
Mike,

Typing on phone now. Will respond more thoroughly when I'm at computer in a couple of hours. Short answer: look at PutKafka. It does something similar if sending delineated records.

Thanks
-Mark

Sent from my iPhone

> On Feb 24, 2015, at 9:30 AM, Mike Drob <[hidden email]> wrote:
>
> NiFi experts,
>
> Let's say that I want to send data from NiFi to some destination that works
> much better when the documents are batched. I do not think this is an
> unreasonable ask.
>
> I imagine that I would want to first combine all of the records in one
> processor, and then pass on to a dedicated processor for sending the data?
> I'm not sure yet if I would be able to use existing processors for this, or
> if I could create my own, but this part feels fairly straightforward.
>
> Next, let's imagine that some document in the batch causes it to fail. I
> would like to un-batch, and create smaller batches, and try to send those,
> assuming that some piece of the data was malformed and not a transient
> error like network unavailable. Is this pattern workable? I can imagine
> several layers of fail/split/retry to winnow from 1000 documents to 100 to
> 10 to 1, so that I can still get most of my data sent and know exactly
> which documents fail.
>
> I'm largely thinking out loud here, somebody stop me if I'm off the deep
> end, or if this has been done before and we have examples (I didn't see any
> readily apparent).
>
> Mike
Reply | Threaded
Open this post in threaded view
|

Re: Batch Processors

Mike Drob-2
In reply to this post by Joe Witt
On Tue, Feb 24, 2015 at 8:35 AM, Joe Witt <[hidden email]> wrote:

> Mike,
>
> This is extremely common.  Both sides of this are.  You have some
> low-latency or batch producer and you want to delivery to some low
> latency of batch receiver.
>
> This is what splitting is for (in the case of going large to small) or
> joining is for (in the case of 'batching').  MergeContent is designed
> for the batching/aggregation case.  It allows you to merge using a
> couple strategies with binary concatenation being the most common.
>
> The very classic example is receiving a live stream of data which
> needs to be sent to HDFS.  We'd setup MergeContent to aggregate data
> to a size that is close to or matches the desired HDFS block size.
>
> Now the part this is interesting that you mention is what if 'object'
> 45 of 100 causes a problem with the downstream system.  How
> would/could NiFi know about that object?  Is it not feasible to
> evaluate the data for its fitness to merge prior to doing so?
>
> NiFi would only know that the batch failed. Maybe we would know that it
failed with 'bad data' rather than 'connection timed out' but I don't think
we would know that it failed with 'bad data at #45'

The use case would be sending data with some kind of constraint. Maybe it
is that field 1 is numeric and field 2 is some kind of date-time, and NiFi
can validate the schema (if it knows about it). But there might also be
business rules field 3 cannot be empty if field 4 is empty. Field 5 must
match an existing username. These are certainly possible to validate in
NiFi but get much harder to do so.


> Anyway - let us know what you're thinking in terms of how NiFi would
> know which object was problematic or that any were problematic for
> that matter.
>
> Thanks
> Joe
>
> On Tue, Feb 24, 2015 at 9:28 AM, Mike Drob <[hidden email]> wrote:
> > NiFi experts,
> >
> > Let's say that I want to send data from NiFi to some destination that
> works
> > much better when the documents are batched. I do not think this is an
> > unreasonable ask.
> >
> > I imagine that I would want to first combine all of the records in one
> > processor, and then pass on to a dedicated processor for sending the
> data?
> > I'm not sure yet if I would be able to use existing processors for this,
> or
> > if I could create my own, but this part feels fairly straightforward.
> >
> > Next, let's imagine that some document in the batch causes it to fail. I
> > would like to un-batch, and create smaller batches, and try to send
> those,
> > assuming that some piece of the data was malformed and not a transient
> > error like network unavailable. Is this pattern workable? I can imagine
> > several layers of fail/split/retry to winnow from 1000 documents to 100
> to
> > 10 to 1, so that I can still get most of my data sent and know exactly
> > which documents fail.
> >
> > I'm largely thinking out loud here, somebody stop me if I'm off the deep
> > end, or if this has been done before and we have examples (I didn't see
> any
> > readily apparent).
> >
> > Mike
>
Reply | Threaded
Open this post in threaded view
|

Re: Batch Processors

Joe Witt
Mike,

Sounds like there are two approaches here (or perhaps a combination).

First is to have the flow configured to detect bad data/validate good
data before merging.

Second is to wait for failures to occur then establish a configuration
which allows you to predict/establish which object(s) was problematic.

Both patterns have been employed both in the design of some processors
and in the design of flows.  In my experience when you can get away
with validation up front it is the more 'cost effective' approach.
But your case may differ.

If you want to dive into more specifics of your case happy to do so.

Thanks
Joe


On Tue, Feb 24, 2015 at 9:53 AM, Mike Drob <[hidden email]> wrote:

> On Tue, Feb 24, 2015 at 8:35 AM, Joe Witt <[hidden email]> wrote:
>
>> Mike,
>>
>> This is extremely common.  Both sides of this are.  You have some
>> low-latency or batch producer and you want to delivery to some low
>> latency of batch receiver.
>>
>> This is what splitting is for (in the case of going large to small) or
>> joining is for (in the case of 'batching').  MergeContent is designed
>> for the batching/aggregation case.  It allows you to merge using a
>> couple strategies with binary concatenation being the most common.
>>
>> The very classic example is receiving a live stream of data which
>> needs to be sent to HDFS.  We'd setup MergeContent to aggregate data
>> to a size that is close to or matches the desired HDFS block size.
>>
>> Now the part this is interesting that you mention is what if 'object'
>> 45 of 100 causes a problem with the downstream system.  How
>> would/could NiFi know about that object?  Is it not feasible to
>> evaluate the data for its fitness to merge prior to doing so?
>>
>> NiFi would only know that the batch failed. Maybe we would know that it
> failed with 'bad data' rather than 'connection timed out' but I don't think
> we would know that it failed with 'bad data at #45'
>
> The use case would be sending data with some kind of constraint. Maybe it
> is that field 1 is numeric and field 2 is some kind of date-time, and NiFi
> can validate the schema (if it knows about it). But there might also be
> business rules field 3 cannot be empty if field 4 is empty. Field 5 must
> match an existing username. These are certainly possible to validate in
> NiFi but get much harder to do so.
>
>
>> Anyway - let us know what you're thinking in terms of how NiFi would
>> know which object was problematic or that any were problematic for
>> that matter.
>>
>> Thanks
>> Joe
>>
>> On Tue, Feb 24, 2015 at 9:28 AM, Mike Drob <[hidden email]> wrote:
>> > NiFi experts,
>> >
>> > Let's say that I want to send data from NiFi to some destination that
>> works
>> > much better when the documents are batched. I do not think this is an
>> > unreasonable ask.
>> >
>> > I imagine that I would want to first combine all of the records in one
>> > processor, and then pass on to a dedicated processor for sending the
>> data?
>> > I'm not sure yet if I would be able to use existing processors for this,
>> or
>> > if I could create my own, but this part feels fairly straightforward.
>> >
>> > Next, let's imagine that some document in the batch causes it to fail. I
>> > would like to un-batch, and create smaller batches, and try to send
>> those,
>> > assuming that some piece of the data was malformed and not a transient
>> > error like network unavailable. Is this pattern workable? I can imagine
>> > several layers of fail/split/retry to winnow from 1000 documents to 100
>> to
>> > 10 to 1, so that I can still get most of my data sent and know exactly
>> > which documents fail.
>> >
>> > I'm largely thinking out loud here, somebody stop me if I'm off the deep
>> > end, or if this has been done before and we have examples (I didn't see
>> any
>> > readily apparent).
>> >
>> > Mike
>>
Reply | Threaded
Open this post in threaded view
|

Re: Batch Processors

Bryan Bende
Mike,

You might also want to look at BinFiles. Joe Gresock submitted a pull
request recently that separated the binning logic from MergeContent into a
base class (BinFiles).

So if you wanted more control over processing the batch, you could write a
processor that extends BinFiles. This might help in deciding what to do
with failures because you would have access to the individual FlowFiles
that make up the batch.

-Bryan

On Tue, Feb 24, 2015 at 10:04 AM, Joe Witt <[hidden email]> wrote:

> Mike,
>
> Sounds like there are two approaches here (or perhaps a combination).
>
> First is to have the flow configured to detect bad data/validate good
> data before merging.
>
> Second is to wait for failures to occur then establish a configuration
> which allows you to predict/establish which object(s) was problematic.
>
> Both patterns have been employed both in the design of some processors
> and in the design of flows.  In my experience when you can get away
> with validation up front it is the more 'cost effective' approach.
> But your case may differ.
>
> If you want to dive into more specifics of your case happy to do so.
>
> Thanks
> Joe
>
>
> On Tue, Feb 24, 2015 at 9:53 AM, Mike Drob <[hidden email]> wrote:
> > On Tue, Feb 24, 2015 at 8:35 AM, Joe Witt <[hidden email]> wrote:
> >
> >> Mike,
> >>
> >> This is extremely common.  Both sides of this are.  You have some
> >> low-latency or batch producer and you want to delivery to some low
> >> latency of batch receiver.
> >>
> >> This is what splitting is for (in the case of going large to small) or
> >> joining is for (in the case of 'batching').  MergeContent is designed
> >> for the batching/aggregation case.  It allows you to merge using a
> >> couple strategies with binary concatenation being the most common.
> >>
> >> The very classic example is receiving a live stream of data which
> >> needs to be sent to HDFS.  We'd setup MergeContent to aggregate data
> >> to a size that is close to or matches the desired HDFS block size.
> >>
> >> Now the part this is interesting that you mention is what if 'object'
> >> 45 of 100 causes a problem with the downstream system.  How
> >> would/could NiFi know about that object?  Is it not feasible to
> >> evaluate the data for its fitness to merge prior to doing so?
> >>
> >> NiFi would only know that the batch failed. Maybe we would know that it
> > failed with 'bad data' rather than 'connection timed out' but I don't
> think
> > we would know that it failed with 'bad data at #45'
> >
> > The use case would be sending data with some kind of constraint. Maybe it
> > is that field 1 is numeric and field 2 is some kind of date-time, and
> NiFi
> > can validate the schema (if it knows about it). But there might also be
> > business rules field 3 cannot be empty if field 4 is empty. Field 5 must
> > match an existing username. These are certainly possible to validate in
> > NiFi but get much harder to do so.
> >
> >
> >> Anyway - let us know what you're thinking in terms of how NiFi would
> >> know which object was problematic or that any were problematic for
> >> that matter.
> >>
> >> Thanks
> >> Joe
> >>
> >> On Tue, Feb 24, 2015 at 9:28 AM, Mike Drob <[hidden email]> wrote:
> >> > NiFi experts,
> >> >
> >> > Let's say that I want to send data from NiFi to some destination that
> >> works
> >> > much better when the documents are batched. I do not think this is an
> >> > unreasonable ask.
> >> >
> >> > I imagine that I would want to first combine all of the records in one
> >> > processor, and then pass on to a dedicated processor for sending the
> >> data?
> >> > I'm not sure yet if I would be able to use existing processors for
> this,
> >> or
> >> > if I could create my own, but this part feels fairly straightforward.
> >> >
> >> > Next, let's imagine that some document in the batch causes it to
> fail. I
> >> > would like to un-batch, and create smaller batches, and try to send
> >> those,
> >> > assuming that some piece of the data was malformed and not a transient
> >> > error like network unavailable. Is this pattern workable? I can
> imagine
> >> > several layers of fail/split/retry to winnow from 1000 documents to
> 100
> >> to
> >> > 10 to 1, so that I can still get most of my data sent and know exactly
> >> > which documents fail.
> >> >
> >> > I'm largely thinking out loud here, somebody stop me if I'm off the
> deep
> >> > end, or if this has been done before and we have examples (I didn't
> see
> >> any
> >> > readily apparent).
> >> >
> >> > Mike
> >>
>