NiFi 1.3 - PublishKafka_0_10 - A message in the stream exceeds the maximum allowed message size of 1048576 bytes

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

NiFi 1.3 - PublishKafka_0_10 - A message in the stream exceeds the maximum allowed message size of 1048576 bytes

mayank rathi
Hello All,

I am getting this error in PublishKafka_0_10 processor for a message of
size 2.08 MB. I have updated Max Request Size to 10 MB in processor
properties and max.request.size to 10 MB in Kafka's server.properties.
After reboot Kafka Broker I can see that max.request.size = 10 MB in Kafka
logs but I am still getting below error.

What am I missing here?

2017-11-17 11:07:47,966 ERROR [Timer-Driven Process Thread-4]
o.a.n.p.kafka.pubsub.PublishKafka_0_10
PublishKafka_0_10[id=e6d932d9-97ae-1647-aa8f-86d07791ce25]
Failed to send all message for StandardFlowFileRecord[uuid=
fa2399e5-bea5-4113-b58b-6cdef228733c,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1510934860019-132,
container=default, section=132], offset=0, length=2160613],offset=0,name=
12337127439954063,size=2160613] to Kafka; routing to failure due to
org.apache.nifi.stream.io.exception.TokenTooLargeException: A message in
the stream exceeds the maximum allowed message size of 1048576 bytes.: {}
org.apache.nifi.stream.io.exception.TokenTooLargeException: A message in
the stream exceeds the maximum allowed message size of 1048576 bytes.
    at org.apache.nifi.stream.io.util.AbstractDemarcator.extractDataToken(
AbstractDemarcator.java:157)
    at org.apache.nifi.stream.io.util.StreamDemarcator.
nextToken(StreamDemarcator.java:129)
    at org.apache.nifi.processors.kafka.pubsub.PublisherLease.
publish(PublisherLease.java:78)
    at org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_
10$1.process(PublishKafka_0_10.java:334)
    at org.apache.nifi.controller.repository.StandardProcessSession.read(
StandardProcessSession.java:2136)
    at org.apache.nifi.controller.repository.StandardProcessSession.read(
StandardProcessSession.java:2106)
    at org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_
10.onTrigger(PublishKafka_0_10.java:330)
    at org.apache.nifi.processor.AbstractProcessor.onTrigger(
AbstractProcessor.java:27)
    at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
StandardProcessorNode.java:1120)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
ContinuallyRunProcessorTask.java:147)
    at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
ContinuallyRunProcessorTask.java:47)
    at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.
run(TimerDrivenSchedulingAgent.java:132)
    at java.util.concurrent.Executors$RunnableAdapter.
call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$
ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$
ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Thanks and Regards
Mayank

--
NOTICE: This email message is for the sole use of the intended recipient(s)
and may contain confidential and privileged information. Any unauthorized
review, use, disclosure or distribution is prohibited. If you are not the
intended recipient, please contact the sender by reply email and destroy
all copies of the original message.
Reply | Threaded
Open this post in threaded view
|

Re: NiFi 1.3 - PublishKafka_0_10 - A message in the stream exceeds the maximum allowed message size of 1048576 bytes

Koji Kawamura-2
Hi Mayank,

I've tried to reproduce the issue, but to no avail so far.
PublishKafka_0_10 uses the specified Max Request Size as expected and
I got the exception if incoming message size exceeds the configured
size.
And I was able to publish messages whose size is 2.08MB with 10MB Max
Request Size.

The stacktrace you reported is created within NiFi AbstractDemarcator
(StreamDemarcator), when it tried to read bytes from incoming FlowFile
content and read size exceeds maxDataSize.
StreamDemarcator.maxDataSize is set to the specified PublishKafka_0_10
'Max Request Size'.

Does this issue still happen? If so, do you mind sharing your
processor configuration by exporting as a template?

Thanks,
Koji


On Sat, Nov 18, 2017 at 1:54 AM, mayank rathi <[hidden email]> wrote:

> Hello All,
>
> I am getting this error in PublishKafka_0_10 processor for a message of
> size 2.08 MB. I have updated Max Request Size to 10 MB in processor
> properties and max.request.size to 10 MB in Kafka's server.properties.
> After reboot Kafka Broker I can see that max.request.size = 10 MB in Kafka
> logs but I am still getting below error.
>
> What am I missing here?
>
> 2017-11-17 11:07:47,966 ERROR [Timer-Driven Process Thread-4]
> o.a.n.p.kafka.pubsub.PublishKafka_0_10
> PublishKafka_0_10[id=e6d932d9-97ae-1647-aa8f-86d07791ce25]
> Failed to send all message for StandardFlowFileRecord[uuid=
> fa2399e5-bea5-4113-b58b-6cdef228733c,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1510934860019-132,
> container=default, section=132], offset=0, length=2160613],offset=0,name=
> 12337127439954063,size=2160613] to Kafka; routing to failure due to
> org.apache.nifi.stream.io.exception.TokenTooLargeException: A message in
> the stream exceeds the maximum allowed message size of 1048576 bytes.: {}
> org.apache.nifi.stream.io.exception.TokenTooLargeException: A message in
> the stream exceeds the maximum allowed message size of 1048576 bytes.
>     at org.apache.nifi.stream.io.util.AbstractDemarcator.extractDataToken(
> AbstractDemarcator.java:157)
>     at org.apache.nifi.stream.io.util.StreamDemarcator.
> nextToken(StreamDemarcator.java:129)
>     at org.apache.nifi.processors.kafka.pubsub.PublisherLease.
> publish(PublisherLease.java:78)
>     at org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_
> 10$1.process(PublishKafka_0_10.java:334)
>     at org.apache.nifi.controller.repository.StandardProcessSession.read(
> StandardProcessSession.java:2136)
>     at org.apache.nifi.controller.repository.StandardProcessSession.read(
> StandardProcessSession.java:2106)
>     at org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_
> 10.onTrigger(PublishKafka_0_10.java:330)
>     at org.apache.nifi.processor.AbstractProcessor.onTrigger(
> AbstractProcessor.java:27)
>     at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> StandardProcessorNode.java:1120)
>     at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:147)
>     at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:47)
>     at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.
> run(TimerDrivenSchedulingAgent.java:132)
>     at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:748)
>
> Thanks and Regards
> Mayank
>
> --
> NOTICE: This email message is for the sole use of the intended recipient(s)
> and may contain confidential and privileged information. Any unauthorized
> review, use, disclosure or distribution is prohibited. If you are not the
> intended recipient, please contact the sender by reply email and destroy
> all copies of the original message.
Reply | Threaded
Open this post in threaded view
|

Re: NiFi 1.3 - PublishKafka_0_10 - A message in the stream exceeds the maximum allowed message size of 1048576 bytes

mayank rathi
Thanks Koji for replying. After your email I looked into Kafka logs again
and found out that message.max.bytes was setup to accept only 1 MB
messages. I increased this to 10 MB and after that messages are now being
pushed to Kafka.

Thanks for helping !!

On Sun, Nov 19, 2017 at 7:01 PM, Koji Kawamura <[hidden email]>
wrote:

> Hi Mayank,
>
> I've tried to reproduce the issue, but to no avail so far.
> PublishKafka_0_10 uses the specified Max Request Size as expected and
> I got the exception if incoming message size exceeds the configured
> size.
> And I was able to publish messages whose size is 2.08MB with 10MB Max
> Request Size.
>
> The stacktrace you reported is created within NiFi AbstractDemarcator
> (StreamDemarcator), when it tried to read bytes from incoming FlowFile
> content and read size exceeds maxDataSize.
> StreamDemarcator.maxDataSize is set to the specified PublishKafka_0_10
> 'Max Request Size'.
>
> Does this issue still happen? If so, do you mind sharing your
> processor configuration by exporting as a template?
>
> Thanks,
> Koji
>
>
> On Sat, Nov 18, 2017 at 1:54 AM, mayank rathi <[hidden email]>
> wrote:
> > Hello All,
> >
> > I am getting this error in PublishKafka_0_10 processor for a message of
> > size 2.08 MB. I have updated Max Request Size to 10 MB in processor
> > properties and max.request.size to 10 MB in Kafka's server.properties.
> > After reboot Kafka Broker I can see that max.request.size = 10 MB in
> Kafka
> > logs but I am still getting below error.
> >
> > What am I missing here?
> >
> > 2017-11-17 11:07:47,966 ERROR [Timer-Driven Process Thread-4]
> > o.a.n.p.kafka.pubsub.PublishKafka_0_10
> > PublishKafka_0_10[id=e6d932d9-97ae-1647-aa8f-86d07791ce25]
> > Failed to send all message for StandardFlowFileRecord[uuid=
> > fa2399e5-bea5-4113-b58b-6cdef228733c,claim=StandardContentClaim
> > [resourceClaim=StandardResourceClaim[id=1510934860019-132,
> > container=default, section=132], offset=0, length=2160613],offset=0,name=
> > 12337127439954063,size=2160613] to Kafka; routing to failure due to
> > org.apache.nifi.stream.io.exception.TokenTooLargeException: A message in
> > the stream exceeds the maximum allowed message size of 1048576 bytes.: {}
> > org.apache.nifi.stream.io.exception.TokenTooLargeException: A message in
> > the stream exceeds the maximum allowed message size of 1048576 bytes.
> >     at org.apache.nifi.stream.io.util.AbstractDemarcator.
> extractDataToken(
> > AbstractDemarcator.java:157)
> >     at org.apache.nifi.stream.io.util.StreamDemarcator.
> > nextToken(StreamDemarcator.java:129)
> >     at org.apache.nifi.processors.kafka.pubsub.PublisherLease.
> > publish(PublisherLease.java:78)
> >     at org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_
> > 10$1.process(PublishKafka_0_10.java:334)
> >     at org.apache.nifi.controller.repository.
> StandardProcessSession.read(
> > StandardProcessSession.java:2136)
> >     at org.apache.nifi.controller.repository.
> StandardProcessSession.read(
> > StandardProcessSession.java:2106)
> >     at org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_
> > 10.onTrigger(PublishKafka_0_10.java:330)
> >     at org.apache.nifi.processor.AbstractProcessor.onTrigger(
> > AbstractProcessor.java:27)
> >     at org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> > StandardProcessorNode.java:1120)
> >     at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> call(
> > ContinuallyRunProcessorTask.java:147)
> >     at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.
> call(
> > ContinuallyRunProcessorTask.java:47)
> >     at org.apache.nifi.controller.scheduling.
> TimerDrivenSchedulingAgent$1.
> > run(TimerDrivenSchedulingAgent.java:132)
> >     at java.util.concurrent.Executors$RunnableAdapter.
> > call(Executors.java:511)
> >     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> >     at java.util.concurrent.ScheduledThreadPoolExecutor$
> > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >     at java.util.concurrent.ScheduledThreadPoolExecutor$
> > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >     at java.util.concurrent.ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1142)
> >     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:617)
> >     at java.lang.Thread.run(Thread.java:748)
> >
> > Thanks and Regards
> > Mayank
> >
> > --
> > NOTICE: This email message is for the sole use of the intended
> recipient(s)
> > and may contain confidential and privileged information. Any unauthorized
> > review, use, disclosure or distribution is prohibited. If you are not the
> > intended recipient, please contact the sender by reply email and destroy
> > all copies of the original message.
>



--
NOTICE: This email message is for the sole use of the intended recipient(s)
and may contain confidential and privileged information. Any unauthorized
review, use, disclosure or distribution is prohibited. If you are not the
intended recipient, please contact the sender by reply email and destroy
all copies of the original message.