FlowFile InputStreams

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

FlowFile InputStreams

Bryan Bende
Is it generally a bad idea to read from a FlowFile InputStream outside of
the session.read() /InputStreamCallback?

For example, if you did something like the following:

       final ObjectHolder<InputStream> inputStreamHolder = new
ObjectHolder<>(null);

        session.read(flowFile, new InputStreamCallback() {
            @Override
            public void process(InputStream in) throws IOException {
                inputStreamHolder.set(in);
            }
        });

        // read from inputStreamHolder.get()

I was expecting that this wouldn't work, but it appears that it does so I
am wondering if there are any negative ramifications of doing this.

-Bryan
Reply | Threaded
Open this post in threaded view
|

Re: FlowFile InputStreams

Joe Witt
Brian,

Mark has the most up to date mental model for the ProcessSession so he
might need to completely refute or modify what i say here.  But...the
intent as the API was designed was that one would not do this nor
should they be able to as you, in my opinion, correctly assumed.

The point of the callback is to enforce important conditions and state
of the process session.  Doing what you have there means that it could
become 'unclear' which input stream you're reading (to you as the
developer) if there are also operations being done which modify the
flow file.

The reason you're able to do that is that we're actually blocking the
close of the root input stream.  But we have two other streams which
wrapped that which we did close (LimitedInputStream and
ByteCountingInputStream) which are how we ensure you only read up to
the amount of bytes appropriate for a given flowfile and so that the
framework can track actual number of bytes read.  However, even though
those have been closed we're not doing anything to block them from
still being used (that I *think* is a bug).

I also notice that ProcessSession.read(...) javadocs indicate an
IllegalStateException should throw an IllegalStateException should be
thrown if this method is being called from within a callback of
another method in this session for the given flow file.  But I don't
see that this would occur.  I am guessing this constraint was relaxed
when the closeability/timing of close was adjusted.  We should clarify
this and fix the API description or enforce it.

Mark can you elaborate, correct, advise?

Thanks
Joe


On Thu, Mar 5, 2015 at 9:58 PM, Bryan Bende <[hidden email]> wrote:

> Is it generally a bad idea to read from a FlowFile InputStream outside of
> the session.read() /InputStreamCallback?
>
> For example, if you did something like the following:
>
>        final ObjectHolder<InputStream> inputStreamHolder = new
> ObjectHolder<>(null);
>
>         session.read(flowFile, new InputStreamCallback() {
>             @Override
>             public void process(InputStream in) throws IOException {
>                 inputStreamHolder.set(in);
>             }
>         });
>
>         // read from inputStreamHolder.get()
>
> I was expecting that this wouldn't work, but it appears that it does so I
> am wondering if there are any negative ramifications of doing this.
>
> -Bryan
Reply | Threaded
Open this post in threaded view
|

Re: FlowFile InputStreams

Mark Payne
Joe,

All seems correct to me. Great analysis - that is not an easy piece of code to follow!

On the OutputStreams, it is wrapped in a DisableOnCloseOutputStream. The input may instead he wrapped in a NonCloseableInputStream, but it should use a DisableOnCloseInputStream instead. This is definitely a bug.

Sent from my iPhone

> On Mar 5, 2015, at 10:42 PM, Joe Witt <[hidden email]> wrote:
>
> Brian,
>
> Mark has the most up to date mental model for the ProcessSession so he
> might need to completely refute or modify what i say here.  But...the
> intent as the API was designed was that one would not do this nor
> should they be able to as you, in my opinion, correctly assumed.
>
> The point of the callback is to enforce important conditions and state
> of the process session.  Doing what you have there means that it could
> become 'unclear' which input stream you're reading (to you as the
> developer) if there are also operations being done which modify the
> flow file.
>
> The reason you're able to do that is that we're actually blocking the
> close of the root input stream.  But we have two other streams which
> wrapped that which we did close (LimitedInputStream and
> ByteCountingInputStream) which are how we ensure you only read up to
> the amount of bytes appropriate for a given flowfile and so that the
> framework can track actual number of bytes read.  However, even though
> those have been closed we're not doing anything to block them from
> still being used (that I *think* is a bug).
>
> I also notice that ProcessSession.read(...) javadocs indicate an
> IllegalStateException should throw an IllegalStateException should be
> thrown if this method is being called from within a callback of
> another method in this session for the given flow file.  But I don't
> see that this would occur.  I am guessing this constraint was relaxed
> when the closeability/timing of close was adjusted.  We should clarify
> this and fix the API description or enforce it.
>
> Mark can you elaborate, correct, advise?
>
> Thanks
> Joe
>
>
>> On Thu, Mar 5, 2015 at 9:58 PM, Bryan Bende <[hidden email]> wrote:
>> Is it generally a bad idea to read from a FlowFile InputStream outside of
>> the session.read() /InputStreamCallback?
>>
>> For example, if you did something like the following:
>>
>>       final ObjectHolder<InputStream> inputStreamHolder = new
>> ObjectHolder<>(null);
>>
>>        session.read(flowFile, new InputStreamCallback() {
>>            @Override
>>            public void process(InputStream in) throws IOException {
>>                inputStreamHolder.set(in);
>>            }
>>        });
>>
>>        // read from inputStreamHolder.get()
>>
>> I was expecting that this wouldn't work, but it appears that it does so I
>> am wondering if there are any negative ramifications of doing this.
>>
>> -Bryan
Reply | Threaded
Open this post in threaded view
|

Re: FlowFile InputStreams

Mike Drob-2
In addition to fixing the code bug, it might be good to build upon the
existing JavaDocs with excerpts from Joe's response.

Mike

On Fri, Mar 6, 2015 at 8:10 AM, Mark Payne <[hidden email]> wrote:

> Joe,
>
> All seems correct to me. Great analysis - that is not an easy piece of
> code to follow!
>
> On the OutputStreams, it is wrapped in a DisableOnCloseOutputStream. The
> input may instead he wrapped in a NonCloseableInputStream, but it should
> use a DisableOnCloseInputStream instead. This is definitely a bug.
>
> Sent from my iPhone
>
> > On Mar 5, 2015, at 10:42 PM, Joe Witt <[hidden email]> wrote:
> >
> > Brian,
> >
> > Mark has the most up to date mental model for the ProcessSession so he
> > might need to completely refute or modify what i say here.  But...the
> > intent as the API was designed was that one would not do this nor
> > should they be able to as you, in my opinion, correctly assumed.
> >
> > The point of the callback is to enforce important conditions and state
> > of the process session.  Doing what you have there means that it could
> > become 'unclear' which input stream you're reading (to you as the
> > developer) if there are also operations being done which modify the
> > flow file.
> >
> > The reason you're able to do that is that we're actually blocking the
> > close of the root input stream.  But we have two other streams which
> > wrapped that which we did close (LimitedInputStream and
> > ByteCountingInputStream) which are how we ensure you only read up to
> > the amount of bytes appropriate for a given flowfile and so that the
> > framework can track actual number of bytes read.  However, even though
> > those have been closed we're not doing anything to block them from
> > still being used (that I *think* is a bug).
> >
> > I also notice that ProcessSession.read(...) javadocs indicate an
> > IllegalStateException should throw an IllegalStateException should be
> > thrown if this method is being called from within a callback of
> > another method in this session for the given flow file.  But I don't
> > see that this would occur.  I am guessing this constraint was relaxed
> > when the closeability/timing of close was adjusted.  We should clarify
> > this and fix the API description or enforce it.
> >
> > Mark can you elaborate, correct, advise?
> >
> > Thanks
> > Joe
> >
> >
> >> On Thu, Mar 5, 2015 at 9:58 PM, Bryan Bende <[hidden email]> wrote:
> >> Is it generally a bad idea to read from a FlowFile InputStream outside
> of
> >> the session.read() /InputStreamCallback?
> >>
> >> For example, if you did something like the following:
> >>
> >>       final ObjectHolder<InputStream> inputStreamHolder = new
> >> ObjectHolder<>(null);
> >>
> >>        session.read(flowFile, new InputStreamCallback() {
> >>            @Override
> >>            public void process(InputStream in) throws IOException {
> >>                inputStreamHolder.set(in);
> >>            }
> >>        });
> >>
> >>        // read from inputStreamHolder.get()
> >>
> >> I was expecting that this wouldn't work, but it appears that it does so
> I
> >> am wondering if there are any negative ramifications of doing this.
> >>
> >> -Bryan
>
Reply | Threaded
Open this post in threaded view
|

Re: FlowFile InputStreams

Michael D. Coon
Hey guys,
   I'm not the expert, but my experience has always been to assume that if I were to try to do anything to the stream outside of the session/read/process method, NiFi would have already either closed, moved, or did something to the stream that forwards on to the processor that comes after me--the state of that stream is unknown outside of that session/read/process callback. I would also have assumed that the stream would be marked in a way internally to not allow me to read the stream ever again after the session/read/process method returns. Am I being overly simplistic in my thoughts about how streams are managed internally--or is that what you're eluding to Joe about throwing an exception if additional access attempts are made?
Mike

     On Friday, March 6, 2015 9:27 AM, Mike Drob <[hidden email]> wrote:
   

 In addition to fixing the code bug, it might be good to build upon the
existing JavaDocs with excerpts from Joe's response.

Mike

On Fri, Mar 6, 2015 at 8:10 AM, Mark Payne <[hidden email]> wrote:

> Joe,
>
> All seems correct to me. Great analysis - that is not an easy piece of
> code to follow!
>
> On the OutputStreams, it is wrapped in a DisableOnCloseOutputStream. The
> input may instead he wrapped in a NonCloseableInputStream, but it should
> use a DisableOnCloseInputStream instead. This is definitely a bug.
>
> Sent from my iPhone
>
> > On Mar 5, 2015, at 10:42 PM, Joe Witt <[hidden email]> wrote:
> >
> > Brian,
> >
> > Mark has the most up to date mental model for the ProcessSession so he
> > might need to completely refute or modify what i say here.  But...the
> > intent as the API was designed was that one would not do this nor
> > should they be able to as you, in my opinion, correctly assumed.
> >
> > The point of the callback is to enforce important conditions and state
> > of the process session.  Doing what you have there means that it could
> > become 'unclear' which input stream you're reading (to you as the
> > developer) if there are also operations being done which modify the
> > flow file.
> >
> > The reason you're able to do that is that we're actually blocking the
> > close of the root input stream.  But we have two other streams which
> > wrapped that which we did close (LimitedInputStream and
> > ByteCountingInputStream) which are how we ensure you only read up to
> > the amount of bytes appropriate for a given flowfile and so that the
> > framework can track actual number of bytes read.  However, even though
> > those have been closed we're not doing anything to block them from
> > still being used (that I *think* is a bug).
> >
> > I also notice that ProcessSession.read(...) javadocs indicate an
> > IllegalStateException should throw an IllegalStateException should be
> > thrown if this method is being called from within a callback of
> > another method in this session for the given flow file.  But I don't
> > see that this would occur.  I am guessing this constraint was relaxed
> > when the closeability/timing of close was adjusted.  We should clarify
> > this and fix the API description or enforce it.
> >
> > Mark can you elaborate, correct, advise?
> >
> > Thanks
> > Joe
> >
> >
> >> On Thu, Mar 5, 2015 at 9:58 PM, Bryan Bende <[hidden email]> wrote:
> >> Is it generally a bad idea to read from a FlowFile InputStream outside
> of
> >> the session.read() /InputStreamCallback?
> >>
> >> For example, if you did something like the following:
> >>
> >>      final ObjectHolder<InputStream> inputStreamHolder = new
> >> ObjectHolder<>(null);
> >>
> >>        session.read(flowFile, new InputStreamCallback() {
> >>            @Override
> >>            public void process(InputStream in) throws IOException {
> >>                inputStreamHolder.set(in);
> >>            }
> >>        });
> >>
> >>        // read from inputStreamHolder.get()
> >>
> >> I was expecting that this wouldn't work, but it appears that it does so
> I
> >> am wondering if there are any negative ramifications of doing this.
> >>
> >> -Bryan
>


Reply | Threaded
Open this post in threaded view
|

Re: FlowFile InputStreams

Joe Witt
Mike

you are correct.  We overly relaxed an assumption somewhere.  It
should be as you and Brian both assumed.  Easy fix it sounds like.
And do Mike Drob's point the API docs need to be modified to match
reality or reality needs to be modified to match the javadocs.

Mark: You gonna create the ticket for this or shall I?

Thanks
Joe

On Fri, Mar 6, 2015 at 9:36 AM, Michael D. Coon
<[hidden email]> wrote:

> Hey guys,
>    I'm not the expert, but my experience has always been to assume that if I were to try to do anything to the stream outside of the session/read/process method, NiFi would have already either closed, moved, or did something to the stream that forwards on to the processor that comes after me--the state of that stream is unknown outside of that session/read/process callback. I would also have assumed that the stream would be marked in a way internally to not allow me to read the stream ever again after the session/read/process method returns. Am I being overly simplistic in my thoughts about how streams are managed internally--or is that what you're eluding to Joe about throwing an exception if additional access attempts are made?
> Mike
>
>      On Friday, March 6, 2015 9:27 AM, Mike Drob <[hidden email]> wrote:
>
>
>  In addition to fixing the code bug, it might be good to build upon the
> existing JavaDocs with excerpts from Joe's response.
>
> Mike
>
> On Fri, Mar 6, 2015 at 8:10 AM, Mark Payne <[hidden email]> wrote:
>
>> Joe,
>>
>> All seems correct to me. Great analysis - that is not an easy piece of
>> code to follow!
>>
>> On the OutputStreams, it is wrapped in a DisableOnCloseOutputStream. The
>> input may instead he wrapped in a NonCloseableInputStream, but it should
>> use a DisableOnCloseInputStream instead. This is definitely a bug.
>>
>> Sent from my iPhone
>>
>> > On Mar 5, 2015, at 10:42 PM, Joe Witt <[hidden email]> wrote:
>> >
>> > Brian,
>> >
>> > Mark has the most up to date mental model for the ProcessSession so he
>> > might need to completely refute or modify what i say here.  But...the
>> > intent as the API was designed was that one would not do this nor
>> > should they be able to as you, in my opinion, correctly assumed.
>> >
>> > The point of the callback is to enforce important conditions and state
>> > of the process session.  Doing what you have there means that it could
>> > become 'unclear' which input stream you're reading (to you as the
>> > developer) if there are also operations being done which modify the
>> > flow file.
>> >
>> > The reason you're able to do that is that we're actually blocking the
>> > close of the root input stream.  But we have two other streams which
>> > wrapped that which we did close (LimitedInputStream and
>> > ByteCountingInputStream) which are how we ensure you only read up to
>> > the amount of bytes appropriate for a given flowfile and so that the
>> > framework can track actual number of bytes read.  However, even though
>> > those have been closed we're not doing anything to block them from
>> > still being used (that I *think* is a bug).
>> >
>> > I also notice that ProcessSession.read(...) javadocs indicate an
>> > IllegalStateException should throw an IllegalStateException should be
>> > thrown if this method is being called from within a callback of
>> > another method in this session for the given flow file.  But I don't
>> > see that this would occur.  I am guessing this constraint was relaxed
>> > when the closeability/timing of close was adjusted.  We should clarify
>> > this and fix the API description or enforce it.
>> >
>> > Mark can you elaborate, correct, advise?
>> >
>> > Thanks
>> > Joe
>> >
>> >
>> >> On Thu, Mar 5, 2015 at 9:58 PM, Bryan Bende <[hidden email]> wrote:
>> >> Is it generally a bad idea to read from a FlowFile InputStream outside
>> of
>> >> the session.read() /InputStreamCallback?
>> >>
>> >> For example, if you did something like the following:
>> >>
>> >>      final ObjectHolder<InputStream> inputStreamHolder = new
>> >> ObjectHolder<>(null);
>> >>
>> >>        session.read(flowFile, new InputStreamCallback() {
>> >>            @Override
>> >>            public void process(InputStream in) throws IOException {
>> >>                inputStreamHolder.set(in);
>> >>            }
>> >>        });
>> >>
>> >>        // read from inputStreamHolder.get()
>> >>
>> >> I was expecting that this wouldn't work, but it appears that it does so
>> I
>> >> am wondering if there are any negative ramifications of doing this.
>> >>
>> >> -Bryan
>>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: FlowFile InputStreams

Mark Payne
In reply to this post by Bryan Bende
I created a ticket for this issue: https://issues.apache.org/jira/browse/NIFI-396


I marked it as a beginner ticket. The description is quite verbose, which could perhaps scare away someone new to the project - but it’s a pretty easy and ticket is very detailed about how it needs to be fixed.


Regarding the IllegalStateException, Joe: that does in fact occur in the validateRecordState method: the method checks the recursionSet; the FlowFile is added to/removed from the recursionSet before/after the callback is executed. Ironically, you wrote this piece of code! 😉


Props to Bryan for catching this. And to answer your original question: yes, it’s a bad idea to consume outside of the callback 😊


Thanks

-Mark






From: Joe Witt
Sent: ‎Friday‎, ‎March‎ ‎6‎, ‎2015 ‎10‎:‎21‎ ‎AM
To: [hidden email], Michael D. Coon





Mike

you are correct.  We overly relaxed an assumption somewhere.  It
should be as you and Brian both assumed.  Easy fix it sounds like.
And do Mike Drob's point the API docs need to be modified to match
reality or reality needs to be modified to match the javadocs.

Mark: You gonna create the ticket for this or shall I?

Thanks
Joe

On Fri, Mar 6, 2015 at 9:36 AM, Michael D. Coon
<[hidden email]> wrote:

> Hey guys,
>    I'm not the expert, but my experience has always been to assume that if I were to try to do anything to the stream outside of the session/read/process method, NiFi would have already either closed, moved, or did something to the stream that forwards on to the processor that comes after me--the state of that stream is unknown outside of that session/read/process callback. I would also have assumed that the stream would be marked in a way internally to not allow me to read the stream ever again after the session/read/process method returns. Am I being overly simplistic in my thoughts about how streams are managed internally--or is that what you're eluding to Joe about throwing an exception if additional access attempts are made?
> Mike
>
>      On Friday, March 6, 2015 9:27 AM, Mike Drob <[hidden email]> wrote:
>
>
>  In addition to fixing the code bug, it might be good to build upon the
> existing JavaDocs with excerpts from Joe's response.
>
> Mike
>
> On Fri, Mar 6, 2015 at 8:10 AM, Mark Payne <[hidden email]> wrote:
>
>> Joe,
>>
>> All seems correct to me. Great analysis - that is not an easy piece of
>> code to follow!
>>
>> On the OutputStreams, it is wrapped in a DisableOnCloseOutputStream. The
>> input may instead he wrapped in a NonCloseableInputStream, but it should
>> use a DisableOnCloseInputStream instead. This is definitely a bug.
>>
>> Sent from my iPhone
>>
>> > On Mar 5, 2015, at 10:42 PM, Joe Witt <[hidden email]> wrote:
>> >
>> > Brian,
>> >
>> > Mark has the most up to date mental model for the ProcessSession so he
>> > might need to completely refute or modify what i say here.  But...the
>> > intent as the API was designed was that one would not do this nor
>> > should they be able to as you, in my opinion, correctly assumed.
>> >
>> > The point of the callback is to enforce important conditions and state
>> > of the process session.  Doing what you have there means that it could
>> > become 'unclear' which input stream you're reading (to you as the
>> > developer) if there are also operations being done which modify the
>> > flow file.
>> >
>> > The reason you're able to do that is that we're actually blocking the
>> > close of the root input stream.  But we have two other streams which
>> > wrapped that which we did close (LimitedInputStream and
>> > ByteCountingInputStream) which are how we ensure you only read up to
>> > the amount of bytes appropriate for a given flowfile and so that the
>> > framework can track actual number of bytes read.  However, even though
>> > those have been closed we're not doing anything to block them from
>> > still being used (that I *think* is a bug).
>> >
>> > I also notice that ProcessSession.read(...) javadocs indicate an
>> > IllegalStateException should throw an IllegalStateException should be
>> > thrown if this method is being called from within a callback of
>> > another method in this session for the given flow file.  But I don't
>> > see that this would occur.  I am guessing this constraint was relaxed
>> > when the closeability/timing of close was adjusted.  We should clarify
>> > this and fix the API description or enforce it.
>> >
>> > Mark can you elaborate, correct, advise?
>> >
>> > Thanks
>> > Joe
>> >
>> >
>> >> On Thu, Mar 5, 2015 at 9:58 PM, Bryan Bende <[hidden email]> wrote:
>> >> Is it generally a bad idea to read from a FlowFile InputStream outside
>> of
>> >> the session.read() /InputStreamCallback?
>> >>
>> >> For example, if you did something like the following:
>> >>
>> >>      final ObjectHolder<InputStream> inputStreamHolder = new
>> >> ObjectHolder<>(null);
>> >>
>> >>        session.read(flowFile, new InputStreamCallback() {
>> >>            @Override
>> >>            public void process(InputStream in) throws IOException {
>> >>                inputStreamHolder.set(in);
>> >>            }
>> >>        });
>> >>
>> >>        // read from inputStreamHolder.get()
>> >>
>> >> I was expecting that this wouldn't work, but it appears that it does so
>> I
>> >> am wondering if there are any negative ramifications of doing this.
>> >>
>> >> -Bryan
>>
>
>
>