Improve S2S to ease "star deployments"

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Improve S2S to ease "star deployments"

Pierre Villard
Hi all,

Here is my use case: I've multiple NiFi standalone instances deployed over
multiple sites (that could be MiNiFi instances) and a central NiFi
cluster.  The standalone instances generate data, the data is sent to the
central cluster to be parsed and enriched before being sent back to the
standalone instances. The data needs to go back where it's been generated.

At the moment, since RPG cannot be configured using EL and FFs attributes,
you need to have one port (or one RPG if the RPG is on central NiFi's side)
per standalone instance. And I don't think that changing the RPG to handle
FFs attributes scope would be a good idea in terms of implementation.

Instead I'd like to change the S2S protocol to allow RPG pulling based on
FFs attributes.

On the standalone instances, we would have:
Workflow generating data => RPG => workflow receiving enriched data from
central cluster

On the NiFi cluster, we would have:
input port => workflow parsing and enriching data => output port

The idea would be that, when configuring an output port in the RPG, it'd be
possible to enable "host based pulling" so that only flow files having the
attribute 's2s.host' matching the host of the instance hosting the RPG
would be pulled. (the s2s.port attribute is already set when data is sent
through S2S).

I already started working on that approach and even though I don't have
something fully functional yet, I wanted to discuss it here to be sure this
would be interesting for the wider community and, also, if I'm not missing
something obvious that would prevent it.

Happy to file a JIRA if that sounds interesting.

Thanks,
Pierre
Reply | Threaded
Open this post in threaded view
|

Re: Improve S2S to ease "star deployments"

Mark Payne
Hey Pierre,

I'm not sure that this is the best route to go down. There are a couple of problems that I think
you will run into. The most important will be what happens when the data going to that Output Port
queues up into a large queue? If a NiFi instance then requests data, I presume that the Output Port
would determine which FlowFiles to send by calling ProcessSession.get(FlowFileFilter);
But currently, if I'm not mistaken, that method only iterates over the data in the 'active' queue, not
data that is swapped out. As a result, you could have the active queue filled up with data for nodes
that are not pulling, and that would prevent any node from pulling data.

Even if we were to change it so that the get(FlowFileFilter) method runs through swapped out data,
the expense of doing that would likely be cost-prohibitive for this approach, as the disk I/O to constantly
scan the swap files would be too expensive. To make that approach feasible you'd probably also have to
change the Swap File format so that its "summary" also contains a mapping of S2S.host to count of FlowFile
for that host. And this is already getting way beyond the scope I think of what you want to do here.

Additionally, I feel like where this concept is heading is difficult to explain and is designed for a rather
specific use case, because it starts to make this into a sort of quasi-pub-sub mechanism but not a true pub/sub.

Rather, I would propose that when the desire is to push data to a specific NiFi node, the preferred approach is
not ot use Site-to-Site (as that's intended to be point-to-point between nifi instnace/clusters for well-established
endpoints). Typically, the approach that is taken for a scenario like this would be to have a ListenHTTP processor
run on each of the instances. They can push to the central instance using Site-to-Site. Then, rather than using an
Output Port, you'd use a PostHTTP processor to push the data back. PostHTTP already supports Expression Language
for the URL, and it has a "Send as FlowFile" option that properly packages the FlowFiles together with their attributes.
It also handles batching together small FlowFiles, supports two-phase commit to minimize possibility of data duplication, etc.
This was the method that was used before Site-to-Site was added, and worked quite well for a long time. Site-to-Site was
added for convenience so that users could just point to a given URL and be provided the list of available ports and have it
auto-load balance across the cluster (if applicable). But in your use case, neither of these really benefit you because you don't
know the URL to send to a priori and you already know exactly which node to push to.

Thanks
-Mark




> On Sep 15, 2018, at 9:05 AM, Pierre Villard <[hidden email]> wrote:
>
> Hi all,
>
> Here is my use case: I've multiple NiFi standalone instances deployed over
> multiple sites (that could be MiNiFi instances) and a central NiFi
> cluster.  The standalone instances generate data, the data is sent to the
> central cluster to be parsed and enriched before being sent back to the
> standalone instances. The data needs to go back where it's been generated.
>
> At the moment, since RPG cannot be configured using EL and FFs attributes,
> you need to have one port (or one RPG if the RPG is on central NiFi's side)
> per standalone instance. And I don't think that changing the RPG to handle
> FFs attributes scope would be a good idea in terms of implementation.
>
> Instead I'd like to change the S2S protocol to allow RPG pulling based on
> FFs attributes.
>
> On the standalone instances, we would have:
> Workflow generating data => RPG => workflow receiving enriched data from
> central cluster
>
> On the NiFi cluster, we would have:
> input port => workflow parsing and enriching data => output port
>
> The idea would be that, when configuring an output port in the RPG, it'd be
> possible to enable "host based pulling" so that only flow files having the
> attribute 's2s.host' matching the host of the instance hosting the RPG
> would be pulled. (the s2s.port attribute is already set when data is sent
> through S2S).
>
> I already started working on that approach and even though I don't have
> something fully functional yet, I wanted to discuss it here to be sure this
> would be interesting for the wider community and, also, if I'm not missing
> something obvious that would prevent it.
>
> Happy to file a JIRA if that sounds interesting.
>
> Thanks,
> Pierre

Reply | Threaded
Open this post in threaded view
|

Re: Improve S2S to ease "star deployments"

Pierre Villard
Hi Mark,

Thanks for the answer. You're right, I was going to use
ProcessSession.get(FlowFileFilter);
And I considered that I would set an expiration date on the flow file in
case a standalone instance is not pulling data to ensure that the queue is
not filling up. But I didn't think about the data being swapped out and,
you're right, we probably don't want to change that.

The HTTP approach sounds indeed like a very good option for my use case.
Thanks for mentioning it.

Pierre



Le sam. 15 sept. 2018 à 15:40, Mark Payne <[hidden email]> a écrit :

> Hey Pierre,
>
> I'm not sure that this is the best route to go down. There are a couple of
> problems that I think
> you will run into. The most important will be what happens when the data
> going to that Output Port
> queues up into a large queue? If a NiFi instance then requests data, I
> presume that the Output Port
> would determine which FlowFiles to send by calling
> ProcessSession.get(FlowFileFilter);
> But currently, if I'm not mistaken, that method only iterates over the
> data in the 'active' queue, not
> data that is swapped out. As a result, you could have the active queue
> filled up with data for nodes
> that are not pulling, and that would prevent any node from pulling data.
>
> Even if we were to change it so that the get(FlowFileFilter) method runs
> through swapped out data,
> the expense of doing that would likely be cost-prohibitive for this
> approach, as the disk I/O to constantly
> scan the swap files would be too expensive. To make that approach feasible
> you'd probably also have to
> change the Swap File format so that its "summary" also contains a mapping
> of S2S.host to count of FlowFile
> for that host. And this is already getting way beyond the scope I think of
> what you want to do here.
>
> Additionally, I feel like where this concept is heading is difficult to
> explain and is designed for a rather
> specific use case, because it starts to make this into a sort of
> quasi-pub-sub mechanism but not a true pub/sub.
>
> Rather, I would propose that when the desire is to push data to a specific
> NiFi node, the preferred approach is
> not ot use Site-to-Site (as that's intended to be point-to-point between
> nifi instnace/clusters for well-established
> endpoints). Typically, the approach that is taken for a scenario like this
> would be to have a ListenHTTP processor
> run on each of the instances. They can push to the central instance using
> Site-to-Site. Then, rather than using an
> Output Port, you'd use a PostHTTP processor to push the data back.
> PostHTTP already supports Expression Language
> for the URL, and it has a "Send as FlowFile" option that properly packages
> the FlowFiles together with their attributes.
> It also handles batching together small FlowFiles, supports two-phase
> commit to minimize possibility of data duplication, etc.
> This was the method that was used before Site-to-Site was added, and
> worked quite well for a long time. Site-to-Site was
> added for convenience so that users could just point to a given URL and be
> provided the list of available ports and have it
> auto-load balance across the cluster (if applicable). But in your use
> case, neither of these really benefit you because you don't
> know the URL to send to a priori and you already know exactly which node
> to push to.
>
> Thanks
> -Mark
>
>
>
>
> > On Sep 15, 2018, at 9:05 AM, Pierre Villard <[hidden email]>
> wrote:
> >
> > Hi all,
> >
> > Here is my use case: I've multiple NiFi standalone instances deployed
> over
> > multiple sites (that could be MiNiFi instances) and a central NiFi
> > cluster.  The standalone instances generate data, the data is sent to the
> > central cluster to be parsed and enriched before being sent back to the
> > standalone instances. The data needs to go back where it's been
> generated.
> >
> > At the moment, since RPG cannot be configured using EL and FFs
> attributes,
> > you need to have one port (or one RPG if the RPG is on central NiFi's
> side)
> > per standalone instance. And I don't think that changing the RPG to
> handle
> > FFs attributes scope would be a good idea in terms of implementation.
> >
> > Instead I'd like to change the S2S protocol to allow RPG pulling based on
> > FFs attributes.
> >
> > On the standalone instances, we would have:
> > Workflow generating data => RPG => workflow receiving enriched data from
> > central cluster
> >
> > On the NiFi cluster, we would have:
> > input port => workflow parsing and enriching data => output port
> >
> > The idea would be that, when configuring an output port in the RPG, it'd
> be
> > possible to enable "host based pulling" so that only flow files having
> the
> > attribute 's2s.host' matching the host of the instance hosting the RPG
> > would be pulled. (the s2s.port attribute is already set when data is sent
> > through S2S).
> >
> > I already started working on that approach and even though I don't have
> > something fully functional yet, I wanted to discuss it here to be sure
> this
> > would be interesting for the wider community and, also, if I'm not
> missing
> > something obvious that would prevent it.
> >
> > Happy to file a JIRA if that sounds interesting.
> >
> > Thanks,
> > Pierre
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Improve S2S to ease "star deployments"

Ed B
Pierre, Mark,
Although I agree it is possible to work it around with HTTP
(Post/Listen/Invoke), I still think that PUB/SUB for S2S (regardless it is
on the same cluster or different ones) worth discussion and implementation.
That would make implementation more natural and mature.

Pierre, don't give up :) Let's discuss more on ideas and have a design.
In addition, I believe that this idea should be combined with "Womhole
connections"
<https://cwiki.apache.org/confluence/display/NIFI/Wormhole+Connections>
idea. So, that will give us full design for procedural development in NIFI.

Thoughts?
Ed.

On Sat, Sep 15, 2018 at 9:53 AM Pierre Villard <[hidden email]>
wrote:

> Hi Mark,
>
> Thanks for the answer. You're right, I was going to use
> ProcessSession.get(FlowFileFilter);
> And I considered that I would set an expiration date on the flow file in
> case a standalone instance is not pulling data to ensure that the queue is
> not filling up. But I didn't think about the data being swapped out and,
> you're right, we probably don't want to change that.
>
> The HTTP approach sounds indeed like a very good option for my use case.
> Thanks for mentioning it.
>
> Pierre
>
>
>
> Le sam. 15 sept. 2018 à 15:40, Mark Payne <[hidden email]> a écrit :
>
> > Hey Pierre,
> >
> > I'm not sure that this is the best route to go down. There are a couple
> of
> > problems that I think
> > you will run into. The most important will be what happens when the data
> > going to that Output Port
> > queues up into a large queue? If a NiFi instance then requests data, I
> > presume that the Output Port
> > would determine which FlowFiles to send by calling
> > ProcessSession.get(FlowFileFilter);
> > But currently, if I'm not mistaken, that method only iterates over the
> > data in the 'active' queue, not
> > data that is swapped out. As a result, you could have the active queue
> > filled up with data for nodes
> > that are not pulling, and that would prevent any node from pulling data.
> >
> > Even if we were to change it so that the get(FlowFileFilter) method runs
> > through swapped out data,
> > the expense of doing that would likely be cost-prohibitive for this
> > approach, as the disk I/O to constantly
> > scan the swap files would be too expensive. To make that approach
> feasible
> > you'd probably also have to
> > change the Swap File format so that its "summary" also contains a mapping
> > of S2S.host to count of FlowFile
> > for that host. And this is already getting way beyond the scope I think
> of
> > what you want to do here.
> >
> > Additionally, I feel like where this concept is heading is difficult to
> > explain and is designed for a rather
> > specific use case, because it starts to make this into a sort of
> > quasi-pub-sub mechanism but not a true pub/sub.
> >
> > Rather, I would propose that when the desire is to push data to a
> specific
> > NiFi node, the preferred approach is
> > not ot use Site-to-Site (as that's intended to be point-to-point between
> > nifi instnace/clusters for well-established
> > endpoints). Typically, the approach that is taken for a scenario like
> this
> > would be to have a ListenHTTP processor
> > run on each of the instances. They can push to the central instance using
> > Site-to-Site. Then, rather than using an
> > Output Port, you'd use a PostHTTP processor to push the data back.
> > PostHTTP already supports Expression Language
> > for the URL, and it has a "Send as FlowFile" option that properly
> packages
> > the FlowFiles together with their attributes.
> > It also handles batching together small FlowFiles, supports two-phase
> > commit to minimize possibility of data duplication, etc.
> > This was the method that was used before Site-to-Site was added, and
> > worked quite well for a long time. Site-to-Site was
> > added for convenience so that users could just point to a given URL and
> be
> > provided the list of available ports and have it
> > auto-load balance across the cluster (if applicable). But in your use
> > case, neither of these really benefit you because you don't
> > know the URL to send to a priori and you already know exactly which node
> > to push to.
> >
> > Thanks
> > -Mark
> >
> >
> >
> >
> > > On Sep 15, 2018, at 9:05 AM, Pierre Villard <
> [hidden email]>
> > wrote:
> > >
> > > Hi all,
> > >
> > > Here is my use case: I've multiple NiFi standalone instances deployed
> > over
> > > multiple sites (that could be MiNiFi instances) and a central NiFi
> > > cluster.  The standalone instances generate data, the data is sent to
> the
> > > central cluster to be parsed and enriched before being sent back to the
> > > standalone instances. The data needs to go back where it's been
> > generated.
> > >
> > > At the moment, since RPG cannot be configured using EL and FFs
> > attributes,
> > > you need to have one port (or one RPG if the RPG is on central NiFi's
> > side)
> > > per standalone instance. And I don't think that changing the RPG to
> > handle
> > > FFs attributes scope would be a good idea in terms of implementation.
> > >
> > > Instead I'd like to change the S2S protocol to allow RPG pulling based
> on
> > > FFs attributes.
> > >
> > > On the standalone instances, we would have:
> > > Workflow generating data => RPG => workflow receiving enriched data
> from
> > > central cluster
> > >
> > > On the NiFi cluster, we would have:
> > > input port => workflow parsing and enriching data => output port
> > >
> > > The idea would be that, when configuring an output port in the RPG,
> it'd
> > be
> > > possible to enable "host based pulling" so that only flow files having
> > the
> > > attribute 's2s.host' matching the host of the instance hosting the RPG
> > > would be pulled. (the s2s.port attribute is already set when data is
> sent
> > > through S2S).
> > >
> > > I already started working on that approach and even though I don't have
> > > something fully functional yet, I wanted to discuss it here to be sure
> > this
> > > would be interesting for the wider community and, also, if I'm not
> > missing
> > > something obvious that would prevent it.
> > >
> > > Happy to file a JIRA if that sounds interesting.
> > >
> > > Thanks,
> > > Pierre
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Improve S2S to ease "star deployments"

Joe Percivall-2
Echoing Mark's advice, we have a "star deployment" and use InvokeHttp with
EL to send requests back "out.

For the addition of PUB/SUB to S2S, I'd be wary of scope creep. PUB/SUB has
entire Apache projects dedicated to getting it right.

Joe

On Sun, Sep 16, 2018 at 3:29 PM Ed B <[hidden email]> wrote:

> Pierre, Mark,
> Although I agree it is possible to work it around with HTTP
> (Post/Listen/Invoke), I still think that PUB/SUB for S2S (regardless it is
> on the same cluster or different ones) worth discussion and implementation.
> That would make implementation more natural and mature.
>
> Pierre, don't give up :) Let's discuss more on ideas and have a design.
> In addition, I believe that this idea should be combined with "Womhole
> connections"
> <https://cwiki.apache.org/confluence/display/NIFI/Wormhole+Connections>
> idea. So, that will give us full design for procedural development in NIFI.
>
> Thoughts?
> Ed.
>
> On Sat, Sep 15, 2018 at 9:53 AM Pierre Villard <
> [hidden email]>
> wrote:
>
> > Hi Mark,
> >
> > Thanks for the answer. You're right, I was going to use
> > ProcessSession.get(FlowFileFilter);
> > And I considered that I would set an expiration date on the flow file in
> > case a standalone instance is not pulling data to ensure that the queue
> is
> > not filling up. But I didn't think about the data being swapped out and,
> > you're right, we probably don't want to change that.
> >
> > The HTTP approach sounds indeed like a very good option for my use case.
> > Thanks for mentioning it.
> >
> > Pierre
> >
> >
> >
> > Le sam. 15 sept. 2018 à 15:40, Mark Payne <[hidden email]> a
> écrit :
> >
> > > Hey Pierre,
> > >
> > > I'm not sure that this is the best route to go down. There are a couple
> > of
> > > problems that I think
> > > you will run into. The most important will be what happens when the
> data
> > > going to that Output Port
> > > queues up into a large queue? If a NiFi instance then requests data, I
> > > presume that the Output Port
> > > would determine which FlowFiles to send by calling
> > > ProcessSession.get(FlowFileFilter);
> > > But currently, if I'm not mistaken, that method only iterates over the
> > > data in the 'active' queue, not
> > > data that is swapped out. As a result, you could have the active queue
> > > filled up with data for nodes
> > > that are not pulling, and that would prevent any node from pulling
> data.
> > >
> > > Even if we were to change it so that the get(FlowFileFilter) method
> runs
> > > through swapped out data,
> > > the expense of doing that would likely be cost-prohibitive for this
> > > approach, as the disk I/O to constantly
> > > scan the swap files would be too expensive. To make that approach
> > feasible
> > > you'd probably also have to
> > > change the Swap File format so that its "summary" also contains a
> mapping
> > > of S2S.host to count of FlowFile
> > > for that host. And this is already getting way beyond the scope I think
> > of
> > > what you want to do here.
> > >
> > > Additionally, I feel like where this concept is heading is difficult to
> > > explain and is designed for a rather
> > > specific use case, because it starts to make this into a sort of
> > > quasi-pub-sub mechanism but not a true pub/sub.
> > >
> > > Rather, I would propose that when the desire is to push data to a
> > specific
> > > NiFi node, the preferred approach is
> > > not ot use Site-to-Site (as that's intended to be point-to-point
> between
> > > nifi instnace/clusters for well-established
> > > endpoints). Typically, the approach that is taken for a scenario like
> > this
> > > would be to have a ListenHTTP processor
> > > run on each of the instances. They can push to the central instance
> using
> > > Site-to-Site. Then, rather than using an
> > > Output Port, you'd use a PostHTTP processor to push the data back.
> > > PostHTTP already supports Expression Language
> > > for the URL, and it has a "Send as FlowFile" option that properly
> > packages
> > > the FlowFiles together with their attributes.
> > > It also handles batching together small FlowFiles, supports two-phase
> > > commit to minimize possibility of data duplication, etc.
> > > This was the method that was used before Site-to-Site was added, and
> > > worked quite well for a long time. Site-to-Site was
> > > added for convenience so that users could just point to a given URL and
> > be
> > > provided the list of available ports and have it
> > > auto-load balance across the cluster (if applicable). But in your use
> > > case, neither of these really benefit you because you don't
> > > know the URL to send to a priori and you already know exactly which
> node
> > > to push to.
> > >
> > > Thanks
> > > -Mark
> > >
> > >
> > >
> > >
> > > > On Sep 15, 2018, at 9:05 AM, Pierre Villard <
> > [hidden email]>
> > > wrote:
> > > >
> > > > Hi all,
> > > >
> > > > Here is my use case: I've multiple NiFi standalone instances deployed
> > > over
> > > > multiple sites (that could be MiNiFi instances) and a central NiFi
> > > > cluster.  The standalone instances generate data, the data is sent to
> > the
> > > > central cluster to be parsed and enriched before being sent back to
> the
> > > > standalone instances. The data needs to go back where it's been
> > > generated.
> > > >
> > > > At the moment, since RPG cannot be configured using EL and FFs
> > > attributes,
> > > > you need to have one port (or one RPG if the RPG is on central NiFi's
> > > side)
> > > > per standalone instance. And I don't think that changing the RPG to
> > > handle
> > > > FFs attributes scope would be a good idea in terms of implementation.
> > > >
> > > > Instead I'd like to change the S2S protocol to allow RPG pulling
> based
> > on
> > > > FFs attributes.
> > > >
> > > > On the standalone instances, we would have:
> > > > Workflow generating data => RPG => workflow receiving enriched data
> > from
> > > > central cluster
> > > >
> > > > On the NiFi cluster, we would have:
> > > > input port => workflow parsing and enriching data => output port
> > > >
> > > > The idea would be that, when configuring an output port in the RPG,
> > it'd
> > > be
> > > > possible to enable "host based pulling" so that only flow files
> having
> > > the
> > > > attribute 's2s.host' matching the host of the instance hosting the
> RPG
> > > > would be pulled. (the s2s.port attribute is already set when data is
> > sent
> > > > through S2S).
> > > >
> > > > I already started working on that approach and even though I don't
> have
> > > > something fully functional yet, I wanted to discuss it here to be
> sure
> > > this
> > > > would be interesting for the wider community and, also, if I'm not
> > > missing
> > > > something obvious that would prevent it.
> > > >
> > > > Happy to file a JIRA if that sounds interesting.
> > > >
> > > > Thanks,
> > > > Pierre
> > >
> > >
> >
>


--
*Joe Percivall*
linkedin.com/in/Percivall
e: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Improve S2S to ease "star deployments"

Pierre Villard
Yeah I think the HTTP approach is fine for my use case where the number of
"satellites" is limited.

However, we can extend this discussion to the IoT use case where satellites
are MiNiFi agents and where the number of agents is thousands or more. In
that case the HTTP approach won't scale and I don't know if MQTT can be
used for this kind of specific delivery (enriched data needs to go back to
a specific agent). Curious to know how we could answer that requirement.

Pierre

Le lun. 17 sept. 2018 à 03:59, Joe Percivall <[hidden email]> a
écrit :

> Echoing Mark's advice, we have a "star deployment" and use InvokeHttp with
> EL to send requests back "out.
>
> For the addition of PUB/SUB to S2S, I'd be wary of scope creep. PUB/SUB has
> entire Apache projects dedicated to getting it right.
>
> Joe
>
> On Sun, Sep 16, 2018 at 3:29 PM Ed B <[hidden email]> wrote:
>
> > Pierre, Mark,
> > Although I agree it is possible to work it around with HTTP
> > (Post/Listen/Invoke), I still think that PUB/SUB for S2S (regardless it
> is
> > on the same cluster or different ones) worth discussion and
> implementation.
> > That would make implementation more natural and mature.
> >
> > Pierre, don't give up :) Let's discuss more on ideas and have a design.
> > In addition, I believe that this idea should be combined with "Womhole
> > connections"
> > <https://cwiki.apache.org/confluence/display/NIFI/Wormhole+Connections>
> > idea. So, that will give us full design for procedural development in
> NIFI.
> >
> > Thoughts?
> > Ed.
> >
> > On Sat, Sep 15, 2018 at 9:53 AM Pierre Villard <
> > [hidden email]>
> > wrote:
> >
> > > Hi Mark,
> > >
> > > Thanks for the answer. You're right, I was going to use
> > > ProcessSession.get(FlowFileFilter);
> > > And I considered that I would set an expiration date on the flow file
> in
> > > case a standalone instance is not pulling data to ensure that the queue
> > is
> > > not filling up. But I didn't think about the data being swapped out
> and,
> > > you're right, we probably don't want to change that.
> > >
> > > The HTTP approach sounds indeed like a very good option for my use
> case.
> > > Thanks for mentioning it.
> > >
> > > Pierre
> > >
> > >
> > >
> > > Le sam. 15 sept. 2018 à 15:40, Mark Payne <[hidden email]> a
> > écrit :
> > >
> > > > Hey Pierre,
> > > >
> > > > I'm not sure that this is the best route to go down. There are a
> couple
> > > of
> > > > problems that I think
> > > > you will run into. The most important will be what happens when the
> > data
> > > > going to that Output Port
> > > > queues up into a large queue? If a NiFi instance then requests data,
> I
> > > > presume that the Output Port
> > > > would determine which FlowFiles to send by calling
> > > > ProcessSession.get(FlowFileFilter);
> > > > But currently, if I'm not mistaken, that method only iterates over
> the
> > > > data in the 'active' queue, not
> > > > data that is swapped out. As a result, you could have the active
> queue
> > > > filled up with data for nodes
> > > > that are not pulling, and that would prevent any node from pulling
> > data.
> > > >
> > > > Even if we were to change it so that the get(FlowFileFilter) method
> > runs
> > > > through swapped out data,
> > > > the expense of doing that would likely be cost-prohibitive for this
> > > > approach, as the disk I/O to constantly
> > > > scan the swap files would be too expensive. To make that approach
> > > feasible
> > > > you'd probably also have to
> > > > change the Swap File format so that its "summary" also contains a
> > mapping
> > > > of S2S.host to count of FlowFile
> > > > for that host. And this is already getting way beyond the scope I
> think
> > > of
> > > > what you want to do here.
> > > >
> > > > Additionally, I feel like where this concept is heading is difficult
> to
> > > > explain and is designed for a rather
> > > > specific use case, because it starts to make this into a sort of
> > > > quasi-pub-sub mechanism but not a true pub/sub.
> > > >
> > > > Rather, I would propose that when the desire is to push data to a
> > > specific
> > > > NiFi node, the preferred approach is
> > > > not ot use Site-to-Site (as that's intended to be point-to-point
> > between
> > > > nifi instnace/clusters for well-established
> > > > endpoints). Typically, the approach that is taken for a scenario like
> > > this
> > > > would be to have a ListenHTTP processor
> > > > run on each of the instances. They can push to the central instance
> > using
> > > > Site-to-Site. Then, rather than using an
> > > > Output Port, you'd use a PostHTTP processor to push the data back.
> > > > PostHTTP already supports Expression Language
> > > > for the URL, and it has a "Send as FlowFile" option that properly
> > > packages
> > > > the FlowFiles together with their attributes.
> > > > It also handles batching together small FlowFiles, supports two-phase
> > > > commit to minimize possibility of data duplication, etc.
> > > > This was the method that was used before Site-to-Site was added, and
> > > > worked quite well for a long time. Site-to-Site was
> > > > added for convenience so that users could just point to a given URL
> and
> > > be
> > > > provided the list of available ports and have it
> > > > auto-load balance across the cluster (if applicable). But in your use
> > > > case, neither of these really benefit you because you don't
> > > > know the URL to send to a priori and you already know exactly which
> > node
> > > > to push to.
> > > >
> > > > Thanks
> > > > -Mark
> > > >
> > > >
> > > >
> > > >
> > > > > On Sep 15, 2018, at 9:05 AM, Pierre Villard <
> > > [hidden email]>
> > > > wrote:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > Here is my use case: I've multiple NiFi standalone instances
> deployed
> > > > over
> > > > > multiple sites (that could be MiNiFi instances) and a central NiFi
> > > > > cluster.  The standalone instances generate data, the data is sent
> to
> > > the
> > > > > central cluster to be parsed and enriched before being sent back to
> > the
> > > > > standalone instances. The data needs to go back where it's been
> > > > generated.
> > > > >
> > > > > At the moment, since RPG cannot be configured using EL and FFs
> > > > attributes,
> > > > > you need to have one port (or one RPG if the RPG is on central
> NiFi's
> > > > side)
> > > > > per standalone instance. And I don't think that changing the RPG to
> > > > handle
> > > > > FFs attributes scope would be a good idea in terms of
> implementation.
> > > > >
> > > > > Instead I'd like to change the S2S protocol to allow RPG pulling
> > based
> > > on
> > > > > FFs attributes.
> > > > >
> > > > > On the standalone instances, we would have:
> > > > > Workflow generating data => RPG => workflow receiving enriched data
> > > from
> > > > > central cluster
> > > > >
> > > > > On the NiFi cluster, we would have:
> > > > > input port => workflow parsing and enriching data => output port
> > > > >
> > > > > The idea would be that, when configuring an output port in the RPG,
> > > it'd
> > > > be
> > > > > possible to enable "host based pulling" so that only flow files
> > having
> > > > the
> > > > > attribute 's2s.host' matching the host of the instance hosting the
> > RPG
> > > > > would be pulled. (the s2s.port attribute is already set when data
> is
> > > sent
> > > > > through S2S).
> > > > >
> > > > > I already started working on that approach and even though I don't
> > have
> > > > > something fully functional yet, I wanted to discuss it here to be
> > sure
> > > > this
> > > > > would be interesting for the wider community and, also, if I'm not
> > > > missing
> > > > > something obvious that would prevent it.
> > > > >
> > > > > Happy to file a JIRA if that sounds interesting.
> > > > >
> > > > > Thanks,
> > > > > Pierre
> > > >
> > > >
> > >
> >
>
>
> --
> *Joe Percivall*
> linkedin.com/in/Percivall
> e: [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: Improve S2S to ease "star deployments"

Aldrin Piri
MQTT can function in its own right outside of Site to Site and MiNiFi could
support that using processors.

A bit of time ago, I made NIFI-1820 [1] which can be notionally summed up
as being "extend Site to Site with caveats depending on backing protocol."
Conceptually, I like what this could provide from a consistent
operator/user perspective and would allow Site to Site to be the prescribed
way of handling data transport between instances within the NiFi ecosystem
using the FlowFile format.  Implementation wise this certainly affects many
parts of the codebase and may additionally need a major release to be
feasible.

[1] https://issues.apache.org/jira/browse/NIFI-1820

On Mon, Sep 17, 2018 at 4:20 AM Pierre Villard <[hidden email]>
wrote:

> Yeah I think the HTTP approach is fine for my use case where the number of
> "satellites" is limited.
>
> However, we can extend this discussion to the IoT use case where satellites
> are MiNiFi agents and where the number of agents is thousands or more. In
> that case the HTTP approach won't scale and I don't know if MQTT can be
> used for this kind of specific delivery (enriched data needs to go back to
> a specific agent). Curious to know how we could answer that requirement.
>
> Pierre
>
> Le lun. 17 sept. 2018 à 03:59, Joe Percivall <[hidden email]> a
> écrit :
>
> > Echoing Mark's advice, we have a "star deployment" and use InvokeHttp
> with
> > EL to send requests back "out.
> >
> > For the addition of PUB/SUB to S2S, I'd be wary of scope creep. PUB/SUB
> has
> > entire Apache projects dedicated to getting it right.
> >
> > Joe
> >
> > On Sun, Sep 16, 2018 at 3:29 PM Ed B <[hidden email]> wrote:
> >
> > > Pierre, Mark,
> > > Although I agree it is possible to work it around with HTTP
> > > (Post/Listen/Invoke), I still think that PUB/SUB for S2S (regardless it
> > is
> > > on the same cluster or different ones) worth discussion and
> > implementation.
> > > That would make implementation more natural and mature.
> > >
> > > Pierre, don't give up :) Let's discuss more on ideas and have a design.
> > > In addition, I believe that this idea should be combined with "Womhole
> > > connections"
> > > <https://cwiki.apache.org/confluence/display/NIFI/Wormhole+Connections
> >
> > > idea. So, that will give us full design for procedural development in
> > NIFI.
> > >
> > > Thoughts?
> > > Ed.
> > >
> > > On Sat, Sep 15, 2018 at 9:53 AM Pierre Villard <
> > > [hidden email]>
> > > wrote:
> > >
> > > > Hi Mark,
> > > >
> > > > Thanks for the answer. You're right, I was going to use
> > > > ProcessSession.get(FlowFileFilter);
> > > > And I considered that I would set an expiration date on the flow file
> > in
> > > > case a standalone instance is not pulling data to ensure that the
> queue
> > > is
> > > > not filling up. But I didn't think about the data being swapped out
> > and,
> > > > you're right, we probably don't want to change that.
> > > >
> > > > The HTTP approach sounds indeed like a very good option for my use
> > case.
> > > > Thanks for mentioning it.
> > > >
> > > > Pierre
> > > >
> > > >
> > > >
> > > > Le sam. 15 sept. 2018 à 15:40, Mark Payne <[hidden email]> a
> > > écrit :
> > > >
> > > > > Hey Pierre,
> > > > >
> > > > > I'm not sure that this is the best route to go down. There are a
> > couple
> > > > of
> > > > > problems that I think
> > > > > you will run into. The most important will be what happens when the
> > > data
> > > > > going to that Output Port
> > > > > queues up into a large queue? If a NiFi instance then requests
> data,
> > I
> > > > > presume that the Output Port
> > > > > would determine which FlowFiles to send by calling
> > > > > ProcessSession.get(FlowFileFilter);
> > > > > But currently, if I'm not mistaken, that method only iterates over
> > the
> > > > > data in the 'active' queue, not
> > > > > data that is swapped out. As a result, you could have the active
> > queue
> > > > > filled up with data for nodes
> > > > > that are not pulling, and that would prevent any node from pulling
> > > data.
> > > > >
> > > > > Even if we were to change it so that the get(FlowFileFilter) method
> > > runs
> > > > > through swapped out data,
> > > > > the expense of doing that would likely be cost-prohibitive for this
> > > > > approach, as the disk I/O to constantly
> > > > > scan the swap files would be too expensive. To make that approach
> > > > feasible
> > > > > you'd probably also have to
> > > > > change the Swap File format so that its "summary" also contains a
> > > mapping
> > > > > of S2S.host to count of FlowFile
> > > > > for that host. And this is already getting way beyond the scope I
> > think
> > > > of
> > > > > what you want to do here.
> > > > >
> > > > > Additionally, I feel like where this concept is heading is
> difficult
> > to
> > > > > explain and is designed for a rather
> > > > > specific use case, because it starts to make this into a sort of
> > > > > quasi-pub-sub mechanism but not a true pub/sub.
> > > > >
> > > > > Rather, I would propose that when the desire is to push data to a
> > > > specific
> > > > > NiFi node, the preferred approach is
> > > > > not ot use Site-to-Site (as that's intended to be point-to-point
> > > between
> > > > > nifi instnace/clusters for well-established
> > > > > endpoints). Typically, the approach that is taken for a scenario
> like
> > > > this
> > > > > would be to have a ListenHTTP processor
> > > > > run on each of the instances. They can push to the central instance
> > > using
> > > > > Site-to-Site. Then, rather than using an
> > > > > Output Port, you'd use a PostHTTP processor to push the data back.
> > > > > PostHTTP already supports Expression Language
> > > > > for the URL, and it has a "Send as FlowFile" option that properly
> > > > packages
> > > > > the FlowFiles together with their attributes.
> > > > > It also handles batching together small FlowFiles, supports
> two-phase
> > > > > commit to minimize possibility of data duplication, etc.
> > > > > This was the method that was used before Site-to-Site was added,
> and
> > > > > worked quite well for a long time. Site-to-Site was
> > > > > added for convenience so that users could just point to a given URL
> > and
> > > > be
> > > > > provided the list of available ports and have it
> > > > > auto-load balance across the cluster (if applicable). But in your
> use
> > > > > case, neither of these really benefit you because you don't
> > > > > know the URL to send to a priori and you already know exactly which
> > > node
> > > > > to push to.
> > > > >
> > > > > Thanks
> > > > > -Mark
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > > On Sep 15, 2018, at 9:05 AM, Pierre Villard <
> > > > [hidden email]>
> > > > > wrote:
> > > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > Here is my use case: I've multiple NiFi standalone instances
> > deployed
> > > > > over
> > > > > > multiple sites (that could be MiNiFi instances) and a central
> NiFi
> > > > > > cluster.  The standalone instances generate data, the data is
> sent
> > to
> > > > the
> > > > > > central cluster to be parsed and enriched before being sent back
> to
> > > the
> > > > > > standalone instances. The data needs to go back where it's been
> > > > > generated.
> > > > > >
> > > > > > At the moment, since RPG cannot be configured using EL and FFs
> > > > > attributes,
> > > > > > you need to have one port (or one RPG if the RPG is on central
> > NiFi's
> > > > > side)
> > > > > > per standalone instance. And I don't think that changing the RPG
> to
> > > > > handle
> > > > > > FFs attributes scope would be a good idea in terms of
> > implementation.
> > > > > >
> > > > > > Instead I'd like to change the S2S protocol to allow RPG pulling
> > > based
> > > > on
> > > > > > FFs attributes.
> > > > > >
> > > > > > On the standalone instances, we would have:
> > > > > > Workflow generating data => RPG => workflow receiving enriched
> data
> > > > from
> > > > > > central cluster
> > > > > >
> > > > > > On the NiFi cluster, we would have:
> > > > > > input port => workflow parsing and enriching data => output port
> > > > > >
> > > > > > The idea would be that, when configuring an output port in the
> RPG,
> > > > it'd
> > > > > be
> > > > > > possible to enable "host based pulling" so that only flow files
> > > having
> > > > > the
> > > > > > attribute 's2s.host' matching the host of the instance hosting
> the
> > > RPG
> > > > > > would be pulled. (the s2s.port attribute is already set when data
> > is
> > > > sent
> > > > > > through S2S).
> > > > > >
> > > > > > I already started working on that approach and even though I
> don't
> > > have
> > > > > > something fully functional yet, I wanted to discuss it here to be
> > > sure
> > > > > this
> > > > > > would be interesting for the wider community and, also, if I'm
> not
> > > > > missing
> > > > > > something obvious that would prevent it.
> > > > > >
> > > > > > Happy to file a JIRA if that sounds interesting.
> > > > > >
> > > > > > Thanks,
> > > > > > Pierre
> > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> > *Joe Percivall*
> > linkedin.com/in/Percivall
> > e: [hidden email]
> >
>