Stateful Processors

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Stateful Processors

Bryan Bende
What are the best practices for implementing a processor that needs to
maintain some kind of state?

I'm thinking of a processor that executes on a timer and pulls data from
somewhere, but needs to know where it left off for the next execution, and
I was hoping to not involve an external data store here.

From looking at processors like GetJMS and GetKafka, I noticed the use of
BlockingQueue<> where poll() is called at the beginning of onTrigger(), and
then the object is put back in the queue in a finally block.

As far as I could tell it looks like the intent was to only have one object
in the queue, and use the queue as the mechanism for synchronizing access
to the shared object, so that if another thread called onTrigger it would
block on poll() until the previous execution put the object back in the
queue.

Is that the general approach?

Thanks,

Bryan
Reply | Threaded
Open this post in threaded view
|

Re: Stateful Processors

Joe Gresock
I'm also interested in the answers to Bryan's questions, if anyone has some
input.

Thanks,
Joe

On Fri, Jan 9, 2015 at 3:50 PM, Bryan Bende <[hidden email]> wrote:

> What are the best practices for implementing a processor that needs to
> maintain some kind of state?
>
> I'm thinking of a processor that executes on a timer and pulls data from
> somewhere, but needs to know where it left off for the next execution, and
> I was hoping to not involve an external data store here.
>
> From looking at processors like GetJMS and GetKafka, I noticed the use of
> BlockingQueue<> where poll() is called at the beginning of onTrigger(), and
> then the object is put back in the queue in a finally block.
>
> As far as I could tell it looks like the intent was to only have one object
> in the queue, and use the queue as the mechanism for synchronizing access
> to the shared object, so that if another thread called onTrigger it would
> block on poll() until the previous execution put the object back in the
> queue.
>
> Is that the general approach?
>
> Thanks,
>
> Bryan
>



--
I know what it is to be in need, and I know what it is to have plenty.  I
have learned the secret of being content in any and every situation,
whether well fed or hungry, whether living in plenty or in want.  I can do
all this through him who gives me strength.    *-Philippians 4:12-13*
Reply | Threaded
Open this post in threaded view
|

Re: Stateful Processors

Joe Witt
Joe - thanks for bumping this.

Bryan,

"What are the best practices for implementing a processor that needs to
maintain some kind of state?

I'm thinking of a processor that executes on a timer and pulls data from
somewhere, but needs to know where it left off for the next execution, and
I was hoping to not involve an external data store here."

The only managed state the framework provides is through the use of Flow
File objects and the passing of them between processors.  To keep
persistent accounting for a given processor of some state of what its doing
that exists outside of that then you do need to implement some state
persistence mechanism (to a file, to a database, etc..).

One example of a processor that does this is the GetHttp processor.  It
interacts with web services and in so doing needs to keep track of any
cache/E-Tag information it receives so it can be smart about pulling the
same resource or not depending on whether the server indicates it has
changed.  How this processor does this is by saving off a file in
'conf/.httpCache-<<processor uuid>>'  This use of the processor uuid in the
name avoids conflicts with other processors of the same type and makes
referencing it on startup very easy.  If it is there use it to recover
state and if not start a new one.

That said it is clearly desirable for the framework to offer some sort of
managed state mechanism for such simple cases.  We've talked about this
many times over the years but just never pulled the trigger because there
was always some aspect of our design ideas we didn't like.  So for right
now you'll need to implement state persistence like this outside the
framework.  But I've also kicked off a Jira for doing something about this
here: https://issues.apache.org/jira/browse/NIFI-259

What you were seeing in GetKafka and GetJMS processors was management of
state that involves interaction with their specific resources (Kafka,
JMS).  In the case of JMS it was a connection pooling type mechanism and in
the case of Kafka it was part of Kafkas stream iterator.   That is a
different thing than this managed persistent state you're asking about.

This is an important topic for us to communicate very well on.  Please feel
free to keep firing away until we've answered it fully.

Thanks
Joe

On Wed, Jan 14, 2015 at 5:06 AM, Joe Gresock <[hidden email]> wrote:

> I'm also interested in the answers to Bryan's questions, if anyone has some
> input.
>
> Thanks,
> Joe
>
> On Fri, Jan 9, 2015 at 3:50 PM, Bryan Bende <[hidden email]> wrote:
>
> > What are the best practices for implementing a processor that needs to
> > maintain some kind of state?
> >
> > I'm thinking of a processor that executes on a timer and pulls data from
> > somewhere, but needs to know where it left off for the next execution,
> and
> > I was hoping to not involve an external data store here.
> >
> > From looking at processors like GetJMS and GetKafka, I noticed the use of
> > BlockingQueue<> where poll() is called at the beginning of onTrigger(),
> and
> > then the object is put back in the queue in a finally block.
> >
> > As far as I could tell it looks like the intent was to only have one
> object
> > in the queue, and use the queue as the mechanism for synchronizing access
> > to the shared object, so that if another thread called onTrigger it would
> > block on poll() until the previous execution put the object back in the
> > queue.
> >
> > Is that the general approach?
> >
> > Thanks,
> >
> > Bryan
> >
>
>
>
> --
> I know what it is to be in need, and I know what it is to have plenty.  I
> have learned the secret of being content in any and every situation,
> whether well fed or hungry, whether living in plenty or in want.  I can do
> all this through him who gives me strength.    *-Philippians 4:12-13*
>
Reply | Threaded
Open this post in threaded view
|

Re: Stateful Processors

Adam Taft
Also of note, the distributed cache service is probably the closest to a
cluster-wide framework state management service.  It currently uses our own
persistence backend, but it's conceivable to adapt the distributed cache to
use a database, jndi resource, or a true cache engine, like ehcache.

Adam


On Wed, Jan 14, 2015 at 7:12 AM, Joe Witt <[hidden email]> wrote:

> Joe - thanks for bumping this.
>
> Bryan,
>
> "What are the best practices for implementing a processor that needs to
> maintain some kind of state?
>
> I'm thinking of a processor that executes on a timer and pulls data from
> somewhere, but needs to know where it left off for the next execution, and
> I was hoping to not involve an external data store here."
>
> The only managed state the framework provides is through the use of Flow
> File objects and the passing of them between processors.  To keep
> persistent accounting for a given processor of some state of what its doing
> that exists outside of that then you do need to implement some state
> persistence mechanism (to a file, to a database, etc..).
>
> One example of a processor that does this is the GetHttp processor.  It
> interacts with web services and in so doing needs to keep track of any
> cache/E-Tag information it receives so it can be smart about pulling the
> same resource or not depending on whether the server indicates it has
> changed.  How this processor does this is by saving off a file in
> 'conf/.httpCache-<<processor uuid>>'  This use of the processor uuid in the
> name avoids conflicts with other processors of the same type and makes
> referencing it on startup very easy.  If it is there use it to recover
> state and if not start a new one.
>
> That said it is clearly desirable for the framework to offer some sort of
> managed state mechanism for such simple cases.  We've talked about this
> many times over the years but just never pulled the trigger because there
> was always some aspect of our design ideas we didn't like.  So for right
> now you'll need to implement state persistence like this outside the
> framework.  But I've also kicked off a Jira for doing something about this
> here: https://issues.apache.org/jira/browse/NIFI-259
>
> What you were seeing in GetKafka and GetJMS processors was management of
> state that involves interaction with their specific resources (Kafka,
> JMS).  In the case of JMS it was a connection pooling type mechanism and in
> the case of Kafka it was part of Kafkas stream iterator.   That is a
> different thing than this managed persistent state you're asking about.
>
> This is an important topic for us to communicate very well on.  Please feel
> free to keep firing away until we've answered it fully.
>
> Thanks
> Joe
>
> On Wed, Jan 14, 2015 at 5:06 AM, Joe Gresock <[hidden email]> wrote:
>
> > I'm also interested in the answers to Bryan's questions, if anyone has
> some
> > input.
> >
> > Thanks,
> > Joe
> >
> > On Fri, Jan 9, 2015 at 3:50 PM, Bryan Bende <[hidden email]> wrote:
> >
> > > What are the best practices for implementing a processor that needs to
> > > maintain some kind of state?
> > >
> > > I'm thinking of a processor that executes on a timer and pulls data
> from
> > > somewhere, but needs to know where it left off for the next execution,
> > and
> > > I was hoping to not involve an external data store here.
> > >
> > > From looking at processors like GetJMS and GetKafka, I noticed the use
> of
> > > BlockingQueue<> where poll() is called at the beginning of onTrigger(),
> > and
> > > then the object is put back in the queue in a finally block.
> > >
> > > As far as I could tell it looks like the intent was to only have one
> > object
> > > in the queue, and use the queue as the mechanism for synchronizing
> access
> > > to the shared object, so that if another thread called onTrigger it
> would
> > > block on poll() until the previous execution put the object back in the
> > > queue.
> > >
> > > Is that the general approach?
> > >
> > > Thanks,
> > >
> > > Bryan
> > >
> >
> >
> >
> > --
> > I know what it is to be in need, and I know what it is to have plenty.  I
> > have learned the secret of being content in any and every situation,
> > whether well fed or hungry, whether living in plenty or in want.  I can
> do
> > all this through him who gives me strength.    *-Philippians 4:12-13*
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Stateful Processors

Bryan Bende
Thanks Joe G. for reviving this thread, and thanks Joe W. and Adam for the
info.  I see what you are talking about with the GetHttp processor, that
makes sense to me now. I will take a look at the distributed cache as well.

Somewhat related, I think I am also trying to make sure I understand how
threads and processors interact, and how to make sure a processor is
thread-safe. It seems as though any processor member variables that might
be changed during onTrigger should be volatile, or of an atomic type, to
ensure the current thread is accessing the proper value.

If you add a processor to the flow and never increase the concurrent tasks
attribute, do you still expect multiple threads to access the processor
concurrently? I'm assuming each processor you drag on to the flow is an
instance (with the unique uuid Joe mentioned) and then the concurrent tasks
equate to threads that execute tasks for that instance. Just trying to make
sure I understand how this is working.

Thanks,

Bryan

On Wed, Jan 14, 2015 at 11:00 AM, Adam Taft <[hidden email]> wrote:

> Also of note, the distributed cache service is probably the closest to a
> cluster-wide framework state management service.  It currently uses our own
> persistence backend, but it's conceivable to adapt the distributed cache to
> use a database, jndi resource, or a true cache engine, like ehcache.
>
> Adam
>
>
> On Wed, Jan 14, 2015 at 7:12 AM, Joe Witt <[hidden email]> wrote:
>
> > Joe - thanks for bumping this.
> >
> > Bryan,
> >
> > "What are the best practices for implementing a processor that needs to
> > maintain some kind of state?
> >
> > I'm thinking of a processor that executes on a timer and pulls data from
> > somewhere, but needs to know where it left off for the next execution,
> and
> > I was hoping to not involve an external data store here."
> >
> > The only managed state the framework provides is through the use of Flow
> > File objects and the passing of them between processors.  To keep
> > persistent accounting for a given processor of some state of what its
> doing
> > that exists outside of that then you do need to implement some state
> > persistence mechanism (to a file, to a database, etc..).
> >
> > One example of a processor that does this is the GetHttp processor.  It
> > interacts with web services and in so doing needs to keep track of any
> > cache/E-Tag information it receives so it can be smart about pulling the
> > same resource or not depending on whether the server indicates it has
> > changed.  How this processor does this is by saving off a file in
> > 'conf/.httpCache-<<processor uuid>>'  This use of the processor uuid in
> the
> > name avoids conflicts with other processors of the same type and makes
> > referencing it on startup very easy.  If it is there use it to recover
> > state and if not start a new one.
> >
> > That said it is clearly desirable for the framework to offer some sort of
> > managed state mechanism for such simple cases.  We've talked about this
> > many times over the years but just never pulled the trigger because there
> > was always some aspect of our design ideas we didn't like.  So for right
> > now you'll need to implement state persistence like this outside the
> > framework.  But I've also kicked off a Jira for doing something about
> this
> > here: https://issues.apache.org/jira/browse/NIFI-259
> >
> > What you were seeing in GetKafka and GetJMS processors was management of
> > state that involves interaction with their specific resources (Kafka,
> > JMS).  In the case of JMS it was a connection pooling type mechanism and
> in
> > the case of Kafka it was part of Kafkas stream iterator.   That is a
> > different thing than this managed persistent state you're asking about.
> >
> > This is an important topic for us to communicate very well on.  Please
> feel
> > free to keep firing away until we've answered it fully.
> >
> > Thanks
> > Joe
> >
> > On Wed, Jan 14, 2015 at 5:06 AM, Joe Gresock <[hidden email]> wrote:
> >
> > > I'm also interested in the answers to Bryan's questions, if anyone has
> > some
> > > input.
> > >
> > > Thanks,
> > > Joe
> > >
> > > On Fri, Jan 9, 2015 at 3:50 PM, Bryan Bende <[hidden email]> wrote:
> > >
> > > > What are the best practices for implementing a processor that needs
> to
> > > > maintain some kind of state?
> > > >
> > > > I'm thinking of a processor that executes on a timer and pulls data
> > from
> > > > somewhere, but needs to know where it left off for the next
> execution,
> > > and
> > > > I was hoping to not involve an external data store here.
> > > >
> > > > From looking at processors like GetJMS and GetKafka, I noticed the
> use
> > of
> > > > BlockingQueue<> where poll() is called at the beginning of
> onTrigger(),
> > > and
> > > > then the object is put back in the queue in a finally block.
> > > >
> > > > As far as I could tell it looks like the intent was to only have one
> > > object
> > > > in the queue, and use the queue as the mechanism for synchronizing
> > access
> > > > to the shared object, so that if another thread called onTrigger it
> > would
> > > > block on poll() until the previous execution put the object back in
> the
> > > > queue.
> > > >
> > > > Is that the general approach?
> > > >
> > > > Thanks,
> > > >
> > > > Bryan
> > > >
> > >
> > >
> > >
> > > --
> > > I know what it is to be in need, and I know what it is to have
> plenty.  I
> > > have learned the secret of being content in any and every situation,
> > > whether well fed or hungry, whether living in plenty or in want.  I can
> > do
> > > all this through him who gives me strength.    *-Philippians 4:12-13*
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Stateful Processors

Joe Witt
Bryan,

"It seems as though any processor member variables that might
be changed during onTrigger should be volatile, or of an atomic type, to
ensure the current thread is accessing the proper value."

"I'm assuming each processor you drag on to the flow is an
instance (with the unique uuid Joe mentioned) and then the concurrent tasks
equate to threads that execute tasks for that instance."

Both of those statements/expectations are absolutely correct.  Sounds like
you're pretty locked in here.

Thanks
Joe

On Wed, Jan 14, 2015 at 9:46 PM, Bryan Bende <[hidden email]> wrote:

> Thanks Joe G. for reviving this thread, and thanks Joe W. and Adam for the
> info.  I see what you are talking about with the GetHttp processor, that
> makes sense to me now. I will take a look at the distributed cache as well.
>
> Somewhat related, I think I am also trying to make sure I understand how
> threads and processors interact, and how to make sure a processor is
> thread-safe. It seems as though any processor member variables that might
> be changed during onTrigger should be volatile, or of an atomic type, to
> ensure the current thread is accessing the proper value.
>
> If you add a processor to the flow and never increase the concurrent tasks
> attribute, do you still expect multiple threads to access the processor
> concurrently? I'm assuming each processor you drag on to the flow is an
> instance (with the unique uuid Joe mentioned) and then the concurrent tasks
> equate to threads that execute tasks for that instance. Just trying to make
> sure I understand how this is working.
>
> Thanks,
>
> Bryan
>
> On Wed, Jan 14, 2015 at 11:00 AM, Adam Taft <[hidden email]> wrote:
>
> > Also of note, the distributed cache service is probably the closest to a
> > cluster-wide framework state management service.  It currently uses our
> own
> > persistence backend, but it's conceivable to adapt the distributed cache
> to
> > use a database, jndi resource, or a true cache engine, like ehcache.
> >
> > Adam
> >
> >
> > On Wed, Jan 14, 2015 at 7:12 AM, Joe Witt <[hidden email]> wrote:
> >
> > > Joe - thanks for bumping this.
> > >
> > > Bryan,
> > >
> > > "What are the best practices for implementing a processor that needs to
> > > maintain some kind of state?
> > >
> > > I'm thinking of a processor that executes on a timer and pulls data
> from
> > > somewhere, but needs to know where it left off for the next execution,
> > and
> > > I was hoping to not involve an external data store here."
> > >
> > > The only managed state the framework provides is through the use of
> Flow
> > > File objects and the passing of them between processors.  To keep
> > > persistent accounting for a given processor of some state of what its
> > doing
> > > that exists outside of that then you do need to implement some state
> > > persistence mechanism (to a file, to a database, etc..).
> > >
> > > One example of a processor that does this is the GetHttp processor.  It
> > > interacts with web services and in so doing needs to keep track of any
> > > cache/E-Tag information it receives so it can be smart about pulling
> the
> > > same resource or not depending on whether the server indicates it has
> > > changed.  How this processor does this is by saving off a file in
> > > 'conf/.httpCache-<<processor uuid>>'  This use of the processor uuid in
> > the
> > > name avoids conflicts with other processors of the same type and makes
> > > referencing it on startup very easy.  If it is there use it to recover
> > > state and if not start a new one.
> > >
> > > That said it is clearly desirable for the framework to offer some sort
> of
> > > managed state mechanism for such simple cases.  We've talked about this
> > > many times over the years but just never pulled the trigger because
> there
> > > was always some aspect of our design ideas we didn't like.  So for
> right
> > > now you'll need to implement state persistence like this outside the
> > > framework.  But I've also kicked off a Jira for doing something about
> > this
> > > here: https://issues.apache.org/jira/browse/NIFI-259
> > >
> > > What you were seeing in GetKafka and GetJMS processors was management
> of
> > > state that involves interaction with their specific resources (Kafka,
> > > JMS).  In the case of JMS it was a connection pooling type mechanism
> and
> > in
> > > the case of Kafka it was part of Kafkas stream iterator.   That is a
> > > different thing than this managed persistent state you're asking about.
> > >
> > > This is an important topic for us to communicate very well on.  Please
> > feel
> > > free to keep firing away until we've answered it fully.
> > >
> > > Thanks
> > > Joe
> > >
> > > On Wed, Jan 14, 2015 at 5:06 AM, Joe Gresock <[hidden email]>
> wrote:
> > >
> > > > I'm also interested in the answers to Bryan's questions, if anyone
> has
> > > some
> > > > input.
> > > >
> > > > Thanks,
> > > > Joe
> > > >
> > > > On Fri, Jan 9, 2015 at 3:50 PM, Bryan Bende <[hidden email]>
> wrote:
> > > >
> > > > > What are the best practices for implementing a processor that needs
> > to
> > > > > maintain some kind of state?
> > > > >
> > > > > I'm thinking of a processor that executes on a timer and pulls data
> > > from
> > > > > somewhere, but needs to know where it left off for the next
> > execution,
> > > > and
> > > > > I was hoping to not involve an external data store here.
> > > > >
> > > > > From looking at processors like GetJMS and GetKafka, I noticed the
> > use
> > > of
> > > > > BlockingQueue<> where poll() is called at the beginning of
> > onTrigger(),
> > > > and
> > > > > then the object is put back in the queue in a finally block.
> > > > >
> > > > > As far as I could tell it looks like the intent was to only have
> one
> > > > object
> > > > > in the queue, and use the queue as the mechanism for synchronizing
> > > access
> > > > > to the shared object, so that if another thread called onTrigger it
> > > would
> > > > > block on poll() until the previous execution put the object back in
> > the
> > > > > queue.
> > > > >
> > > > > Is that the general approach?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Bryan
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > I know what it is to be in need, and I know what it is to have
> > plenty.  I
> > > > have learned the secret of being content in any and every situation,
> > > > whether well fed or hungry, whether living in plenty or in want.  I
> can
> > > do
> > > > all this through him who gives me strength.    *-Philippians 4:12-13*
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

RE: Stateful Processors

Mark Payne
Bryan,

"If you add a processor to the flow and never increase the concurrent tasks
attribute, do you still expect multiple threads to access the processor concurrently?"

The answer here is "sort of."

In this case, you'll never have more than 1 thread in the 'onTrigger' method at the same time.
However, there is a thread pool that is shared by all Processors, so you may have 'Thread 1' execute
onTrigger, than 'Thread 2' execute onTrigger, then 'Thread 3' and then 'Thread 1' again. So while they
won't be executing concurrently, the Process must still be thread safe, because you will have different
threads accessing the same member variables.

Additionally, if you have methods annotated with @OnScheduled, @OnUnscheduled, etc., then these
will be executed in a separate thread potentially while onTrigger is being executed.

You'll also want to be careful if any of your member variables are mutable objects. I.e., marking a List
as volatile won't make things thread-safe.

If it makes sense to only allow you Processor to be run by one thread at a time, though, you can add
the @TriggerSerially annotation to the Processor to let the framework know that the "Concurrent Tasks"
setting shouldn't even be exposed to the user and should always be 1.

Thanks
-Mark


> Date: Wed, 14 Jan 2015 22:50:06 -0500
> Subject: Re: Stateful Processors
> From: [hidden email]
> To: [hidden email]
>
> Bryan,
>
> "It seems as though any processor member variables that might
> be changed during onTrigger should be volatile, or of an atomic type, to
> ensure the current thread is accessing the proper value."
>
> "I'm assuming each processor you drag on to the flow is an
> instance (with the unique uuid Joe mentioned) and then the concurrent tasks
> equate to threads that execute tasks for that instance."
>
> Both of those statements/expectations are absolutely correct.  Sounds like
> you're pretty locked in here.
>
> Thanks
> Joe
>
> On Wed, Jan 14, 2015 at 9:46 PM, Bryan Bende <[hidden email]> wrote:
>
> > Thanks Joe G. for reviving this thread, and thanks Joe W. and Adam for the
> > info.  I see what you are talking about with the GetHttp processor, that
> > makes sense to me now. I will take a look at the distributed cache as well.
> >
> > Somewhat related, I think I am also trying to make sure I understand how
> > threads and processors interact, and how to make sure a processor is
> > thread-safe. It seems as though any processor member variables that might
> > be changed during onTrigger should be volatile, or of an atomic type, to
> > ensure the current thread is accessing the proper value.
> >
> > If you add a processor to the flow and never increase the concurrent tasks
> > attribute, do you still expect multiple threads to access the processor
> > concurrently? I'm assuming each processor you drag on to the flow is an
> > instance (with the unique uuid Joe mentioned) and then the concurrent tasks
> > equate to threads that execute tasks for that instance. Just trying to make
> > sure I understand how this is working.
> >
> > Thanks,
> >
> > Bryan
> >
> > On Wed, Jan 14, 2015 at 11:00 AM, Adam Taft <[hidden email]> wrote:
> >
> > > Also of note, the distributed cache service is probably the closest to a
> > > cluster-wide framework state management service.  It currently uses our
> > own
> > > persistence backend, but it's conceivable to adapt the distributed cache
> > to
> > > use a database, jndi resource, or a true cache engine, like ehcache.
> > >
> > > Adam
> > >
> > >
> > > On Wed, Jan 14, 2015 at 7:12 AM, Joe Witt <[hidden email]> wrote:
> > >
> > > > Joe - thanks for bumping this.
> > > >
> > > > Bryan,
> > > >
> > > > "What are the best practices for implementing a processor that needs to
> > > > maintain some kind of state?
> > > >
> > > > I'm thinking of a processor that executes on a timer and pulls data
> > from
> > > > somewhere, but needs to know where it left off for the next execution,
> > > and
> > > > I was hoping to not involve an external data store here."
> > > >
> > > > The only managed state the framework provides is through the use of
> > Flow
> > > > File objects and the passing of them between processors.  To keep
> > > > persistent accounting for a given processor of some state of what its
> > > doing
> > > > that exists outside of that then you do need to implement some state
> > > > persistence mechanism (to a file, to a database, etc..).
> > > >
> > > > One example of a processor that does this is the GetHttp processor.  It
> > > > interacts with web services and in so doing needs to keep track of any
> > > > cache/E-Tag information it receives so it can be smart about pulling
> > the
> > > > same resource or not depending on whether the server indicates it has
> > > > changed.  How this processor does this is by saving off a file in
> > > > 'conf/.httpCache-<<processor uuid>>'  This use of the processor uuid in
> > > the
> > > > name avoids conflicts with other processors of the same type and makes
> > > > referencing it on startup very easy.  If it is there use it to recover
> > > > state and if not start a new one.
> > > >
> > > > That said it is clearly desirable for the framework to offer some sort
> > of
> > > > managed state mechanism for such simple cases.  We've talked about this
> > > > many times over the years but just never pulled the trigger because
> > there
> > > > was always some aspect of our design ideas we didn't like.  So for
> > right
> > > > now you'll need to implement state persistence like this outside the
> > > > framework.  But I've also kicked off a Jira for doing something about
> > > this
> > > > here: https://issues.apache.org/jira/browse/NIFI-259
> > > >
> > > > What you were seeing in GetKafka and GetJMS processors was management
> > of
> > > > state that involves interaction with their specific resources (Kafka,
> > > > JMS).  In the case of JMS it was a connection pooling type mechanism
> > and
> > > in
> > > > the case of Kafka it was part of Kafkas stream iterator.   That is a
> > > > different thing than this managed persistent state you're asking about.
> > > >
> > > > This is an important topic for us to communicate very well on.  Please
> > > feel
> > > > free to keep firing away until we've answered it fully.
> > > >
> > > > Thanks
> > > > Joe
> > > >
> > > > On Wed, Jan 14, 2015 at 5:06 AM, Joe Gresock <[hidden email]>
> > wrote:
> > > >
> > > > > I'm also interested in the answers to Bryan's questions, if anyone
> > has
> > > > some
> > > > > input.
> > > > >
> > > > > Thanks,
> > > > > Joe
> > > > >
> > > > > On Fri, Jan 9, 2015 at 3:50 PM, Bryan Bende <[hidden email]>
> > wrote:
> > > > >
> > > > > > What are the best practices for implementing a processor that needs
> > > to
> > > > > > maintain some kind of state?
> > > > > >
> > > > > > I'm thinking of a processor that executes on a timer and pulls data
> > > > from
> > > > > > somewhere, but needs to know where it left off for the next
> > > execution,
> > > > > and
> > > > > > I was hoping to not involve an external data store here.
> > > > > >
> > > > > > From looking at processors like GetJMS and GetKafka, I noticed the
> > > use
> > > > of
> > > > > > BlockingQueue<> where poll() is called at the beginning of
> > > onTrigger(),
> > > > > and
> > > > > > then the object is put back in the queue in a finally block.
> > > > > >
> > > > > > As far as I could tell it looks like the intent was to only have
> > one
> > > > > object
> > > > > > in the queue, and use the queue as the mechanism for synchronizing
> > > > access
> > > > > > to the shared object, so that if another thread called onTrigger it
> > > > would
> > > > > > block on poll() until the previous execution put the object back in
> > > the
> > > > > > queue.
> > > > > >
> > > > > > Is that the general approach?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bryan
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > I know what it is to be in need, and I know what it is to have
> > > plenty.  I
> > > > > have learned the secret of being content in any and every situation,
> > > > > whether well fed or hungry, whether living in plenty or in want.  I
> > can
> > > > do
> > > > > all this through him who gives me strength.    *-Philippians 4:12-13*
> > > > >
> > > >
> > >
> >
     
Reply | Threaded
Open this post in threaded view
|

Re: Stateful Processors

Adam Taft
On Thu, Jan 15, 2015 at 7:48 AM, Mark Payne <[hidden email]> wrote:

> Additionally, if you have methods annotated with @OnScheduled,
> @OnUnscheduled, etc., then these
> will be executed in a separate thread potentially while onTrigger is being
> executed.
>

​Ouch, really?  You can't guarantee that your @OnScheduled method is
completed before your onTrigger() method is invoked?

The whole point of @OnScheduled is to have a place to perform expensive
resource allocations on other onetime seteup needed before onTrigger is
even called.  Saying that @OnScheduled doesn't finish before onTrigger is
called is probably a significant issue, since there are likely processors
relying on this ordering.

Since it's come up many times in conversation, Mark do you think you could
include a full description of the entire lifecycle of a processor?  This
would ideally include which methods or annotations are fired at what time,
which lifecycle "happens before" guarantees a processor developer can rely
on, etc.​
Reply | Threaded
Open this post in threaded view
|

RE: Stateful Processors

Mark Payne
Sorry, good call, Adam! @OnScheduled methods are completed before onTrigger is called. However, @OnUnscheduled could be called while your onTrigger method is executing.

Regarding the documentation of the lifecycle of Processors: this will be a major part of the Developer Guide that we are promising to put together.

> Date: Thu, 15 Jan 2015 09:13:29 -0500
> Subject: Re: Stateful Processors
> From: [hidden email]
> To: [hidden email]
>
> On Thu, Jan 15, 2015 at 7:48 AM, Mark Payne <[hidden email]> wrote:
>
> > Additionally, if you have methods annotated with @OnScheduled,
> > @OnUnscheduled, etc., then these
> > will be executed in a separate thread potentially while onTrigger is being
> > executed.
> >
>
> ​Ouch, really?  You can't guarantee that your @OnScheduled method is
> completed before your onTrigger() method is invoked?
>
> The whole point of @OnScheduled is to have a place to perform expensive
> resource allocations on other onetime seteup needed before onTrigger is
> even called.  Saying that @OnScheduled doesn't finish before onTrigger is
> called is probably a significant issue, since there are likely processors
> relying on this ordering.
>
> Since it's come up many times in conversation, Mark do you think you could
> include a full description of the entire lifecycle of a processor?  This
> would ideally include which methods or annotations are fired at what time,
> which lifecycle "happens before" guarantees a processor developer can rely
> on, etc.​