NiFi - Cluster crashes while running CaptureChangeMySQL for CDC

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

NiFi - Cluster crashes while running CaptureChangeMySQL for CDC

Purushotham Pushpavanthar
Hi,

I've been trying to run CDC in 3 node NiFi (ver : 1.9.2) cluster similar to
the what is illustrated here
<https://community.hortonworks.com/articles/113941/change-data-capture-cdc-with-apache-nifi-version-1-1.html>
.
However, I've been facing below issue because of scale. When I start the
processor, the JVM heap(12 GB) utilization reaches 100% on primary node and
then crashes. Same repeats with the other nodes when new Cluster
coordinator/Primary node is elected.

I tried debugging this issue and drafted out below details for discussion.

The processor initializes an unbounded *LinkedBlockingQueue* and registers
it with the listener to binlog client.
The BinaryLogClient reads the binlog files and adds it into the Queue. When
triggered the processor drains the Queue and writes the events to flowfile
and transfer it to SUCCESS relationship in a single thread. However, when
throughput in the database is huge, the queue gets flooded with events and
single threaded processor fails to catch up and results in bloating up the
JVM and the primary node.
Below are the reasons I suspect most (Feel free to correct me if I'm wrong.
Let's debate on this for better understanding).

   1. Due to decoupled nature of this Queue with respect to NiFi
   connections, the back pressure configurations doesn't have any control on
   throughput of BinaryLogClient.
   2. I tried increasing JVM memory settings from Xms3G and Xmx3G to *Xms32G
   and Xmx32G.* If not for CDC, our cluster used run at 20%-70% heap
   utilization with Xmx3G (past 4 months). I'm unable to budget the JVM usage.
   There should be a limit on how much share a processor can take out of the
   cluster.
   3. Having CaptureChangeMySQL as single threaded processor, running on
   primary node adds to the above issue.
   4. The processor doesn't have batching. Ends up creating too many
   flowfiles whose content size is comparable to the its flowfile attributes
   in memory.

I'm posting this thread to initiate discussion on how to solve this issue.
Please share your experiences if you have faced similar issue
in CaptureChangeMySQL processor or any other NiFi processor.
What work around did you follow? How did you fix it?
Is NiFi a right tool for CDC use case?
If so should we have a separate cluster for per CDC pipeline based on the
scale since executing CaptureChangeMySQL in primary node is bottleneck?

*Cluster specs :*
3 Node Cluster

   - Model : c5.2xlarge
   - vCPU : 8
   - Memory (GiB) : 16
   - Instance Storage (GiB) : 200 (EBS)
   - Network Bandwidth (Gbps) : Up to 10
   - EBS Bandwidth (Mbps) : Up to 3,500

*Nifi Configs :*
*Bootstrap.conf*

   - java.arg.2=-Xms12G
   - java.arg.3=-Xmx12G
   - java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true
   - java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol
   - java.arg.7=-XX:ReservedCodeCacheSize=256m
   - java.arg.8=-XX:CodeCacheMinimumFreeSpace=10m
   - java.arg.9=-XX:+UseCodeCacheFlushing


*nifi.properties *(I can share more configs if needed)

   - nifi.queue.backpressure.count=100000
   - nifi.queue.backpressure.size=1 GB


Regards,
Purushotham Pushpavanth
Reply | Threaded
Open this post in threaded view
|

Re: NiFi - Cluster crashes while running CaptureChangeMySQL for CDC

Bryan Bende
I'm not very familiar with this processor, but I think we should
probably set a size on the blocking queue so that it can't grow
indefinitely, and possibly make the size configurable as a property of
the processor.

On Sat, Jul 13, 2019 at 2:05 PM Purushotham Pushpavanthar
<[hidden email]> wrote:

>
> Hi,
>
> I've been trying to run CDC in 3 node NiFi (ver : 1.9.2) cluster similar to
> the what is illustrated here
> <https://community.hortonworks.com/articles/113941/change-data-capture-cdc-with-apache-nifi-version-1-1.html>
> .
> However, I've been facing below issue because of scale. When I start the
> processor, the JVM heap(12 GB) utilization reaches 100% on primary node and
> then crashes. Same repeats with the other nodes when new Cluster
> coordinator/Primary node is elected.
>
> I tried debugging this issue and drafted out below details for discussion.
>
> The processor initializes an unbounded *LinkedBlockingQueue* and registers
> it with the listener to binlog client.
> The BinaryLogClient reads the binlog files and adds it into the Queue. When
> triggered the processor drains the Queue and writes the events to flowfile
> and transfer it to SUCCESS relationship in a single thread. However, when
> throughput in the database is huge, the queue gets flooded with events and
> single threaded processor fails to catch up and results in bloating up the
> JVM and the primary node.
> Below are the reasons I suspect most (Feel free to correct me if I'm wrong.
> Let's debate on this for better understanding).
>
>    1. Due to decoupled nature of this Queue with respect to NiFi
>    connections, the back pressure configurations doesn't have any control on
>    throughput of BinaryLogClient.
>    2. I tried increasing JVM memory settings from Xms3G and Xmx3G to *Xms32G
>    and Xmx32G.* If not for CDC, our cluster used run at 20%-70% heap
>    utilization with Xmx3G (past 4 months). I'm unable to budget the JVM usage.
>    There should be a limit on how much share a processor can take out of the
>    cluster.
>    3. Having CaptureChangeMySQL as single threaded processor, running on
>    primary node adds to the above issue.
>    4. The processor doesn't have batching. Ends up creating too many
>    flowfiles whose content size is comparable to the its flowfile attributes
>    in memory.
>
> I'm posting this thread to initiate discussion on how to solve this issue.
> Please share your experiences if you have faced similar issue
> in CaptureChangeMySQL processor or any other NiFi processor.
> What work around did you follow? How did you fix it?
> Is NiFi a right tool for CDC use case?
> If so should we have a separate cluster for per CDC pipeline based on the
> scale since executing CaptureChangeMySQL in primary node is bottleneck?
>
> *Cluster specs :*
> 3 Node Cluster
>
>    - Model : c5.2xlarge
>    - vCPU : 8
>    - Memory (GiB) : 16
>    - Instance Storage (GiB) : 200 (EBS)
>    - Network Bandwidth (Gbps) : Up to 10
>    - EBS Bandwidth (Mbps) : Up to 3,500
>
> *Nifi Configs :*
> *Bootstrap.conf*
>
>    - java.arg.2=-Xms12G
>    - java.arg.3=-Xmx12G
>    - java.arg.5=-Dsun.net.http.allowRestrictedHeaders=true
>    - java.arg.6=-Djava.protocol.handler.pkgs=sun.net.www.protocol
>    - java.arg.7=-XX:ReservedCodeCacheSize=256m
>    - java.arg.8=-XX:CodeCacheMinimumFreeSpace=10m
>    - java.arg.9=-XX:+UseCodeCacheFlushing
>
>
> *nifi.properties *(I can share more configs if needed)
>
>    - nifi.queue.backpressure.count=100000
>    - nifi.queue.backpressure.size=1 GB
>
>
> Regards,
> Purushotham Pushpavanth