Nifi FlowFile state when cloning

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Nifi FlowFile state when cloning

Matthew Watson
Hello,

In updating our Nifi setup to 1.4.0, one of our regression tests started to
fail.

On investigation I ran into an issue with FlowFile state that has confused
me.

The following processor takes an input flow file, writes "NEW_DATA" to it,
the clones it.
It then outputs the original flowFile and the clone to different output
relationships.
I had expected that the two outputs would be the same, but if I run this
through (using GenerateFlowFile to generate input), I get different output.

If I play in data "OLD", then:
OutputA receives a flowFile with the written content "NEW_DATA".
OutputB receives the original content "OLD" that was sent into the
processor.

If instead I play in data longer than 8 bytes (ie longer than "NEW_DATA"),
then CloneTestProcess errors and yields.
The error from the logs is below.

Any advice would be appreciated.

Thanks in advance,
Matthew Watson

Code:

public class CloneTestProcessor extends AbstractProcessor {

    public static final Relationship OUTPUT_A = new
Relationship.Builder().name("OUTPUT_A").build();
    public static final Relationship OUTPUT_B = new
Relationship.Builder().name("OUTPUT_B").build();

    @Override
    public Set<Relationship> getRelationships() {
        return ImmutableSet.of(OUTPUT_A, OUTPUT_B);
    }

    @Override
    public void onTrigger(ProcessContext context, ProcessSession
session) throws ProcessException {

        String DATA = "NEW_DATA";

        FlowFile flowFile = session.get();
        if(flowFile == null) {
            return;
        }

        FlowFile output = session.write(flowFile, (in, out) -> {
            out.write(DATA.getBytes());
        });

        FlowFile other = session.clone(flowFile);
        session.transfer(flowFile, OUTPUT_A);
        session.transfer(other, OUTPUT_B);
    }
}

Error:

2017-11-22 16:56:19,326 WARN [Timer-Driven Process Thread-10]
o.a.n.c.t.ContinuallyRunProcessorTask
org.apache.nifi.processor.exception.FlowFileHandlingException: Specified
offset of 0 and size 9 exceeds size of
StandardFlowFileRecord[uuid=5d279723-64a3-4ad2-a8f6-3f974baf38ce,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1511369750666-1, container=default,
section=1], offset=51, length=8],offset=0,name=29102663649743,size=8]
        at
org.apache.nifi.controller.repository.StandardProcessSession.clone(StandardProcessSession.java:1672)
        at
org.apache.nifi.controller.repository.StandardProcessSession.clone(StandardProcessSession.java:1662)
        at
com.baesystemsai.nifi.utility.CloneTestProcessor.onTrigger(CloneTestProcessor.java:59)
        at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
        at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
        at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
        at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
        at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Reply | Threaded
Open this post in threaded view
|

Re: Nifi FlowFile state when cloning

Mark Payne
Matthew,

It's awesome that you've got these kinds of tests running! Thanks for shooting a note
to the dev list. After a quick glance at the code, I can understand why the second case
was throwing an Exception. I'm not sure why the other case was occurring. But I am happy
to look into it.

I've created a JIRA [1] to track this.

Thanks
-Mark

 [1] https://issues.apache.org/jira/browse/NIFI-4633


On Nov 22, 2017, at 11:57 AM, Matthew Watson <[hidden email]<mailto:[hidden email]>> wrote:

Hello,

In updating our Nifi setup to 1.4.0, one of our regression tests started to
fail.

On investigation I ran into an issue with FlowFile state that has confused
me.

The following processor takes an input flow file, writes "NEW_DATA" to it,
the clones it.
It then outputs the original flowFile and the clone to different output
relationships.
I had expected that the two outputs would be the same, but if I run this
through (using GenerateFlowFile to generate input), I get different output.

If I play in data "OLD", then:
OutputA receives a flowFile with the written content "NEW_DATA".
OutputB receives the original content "OLD" that was sent into the
processor.

If instead I play in data longer than 8 bytes (ie longer than "NEW_DATA"),
then CloneTestProcess errors and yields.
The error from the logs is below.

Any advice would be appreciated.

Thanks in advance,
Matthew Watson

Code:

public class CloneTestProcessor extends AbstractProcessor {

   public static final Relationship OUTPUT_A = new
Relationship.Builder().name("OUTPUT_A").build();
   public static final Relationship OUTPUT_B = new
Relationship.Builder().name("OUTPUT_B").build();

   @Override
   public Set<Relationship> getRelationships() {
       return ImmutableSet.of(OUTPUT_A, OUTPUT_B);
   }

   @Override
   public void onTrigger(ProcessContext context, ProcessSession
session) throws ProcessException {

       String DATA = "NEW_DATA";

       FlowFile flowFile = session.get();
       if(flowFile == null) {
           return;
       }

       FlowFile output = session.write(flowFile, (in, out) -> {
           out.write(DATA.getBytes());
       });

       FlowFile other = session.clone(flowFile);
       session.transfer(flowFile, OUTPUT_A);
       session.transfer(other, OUTPUT_B);
   }
}

Error:

2017-11-22 16:56:19,326 WARN [Timer-Driven Process Thread-10]
o.a.n.c.t.ContinuallyRunProcessorTask
org.apache.nifi.processor.exception.FlowFileHandlingException: Specified
offset of 0 and size 9 exceeds size of
StandardFlowFileRecord[uuid=5d279723-64a3-4ad2-a8f6-3f974baf38ce,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1511369750666-1, container=default,
section=1], offset=51, length=8],offset=0,name=29102663649743,size=8]
       at
org.apache.nifi.controller.repository.StandardProcessSession.clone(StandardProcessSession.java:1672)
       at
org.apache.nifi.controller.repository.StandardProcessSession.clone(StandardProcessSession.java:1662)
       at
com.baesystemsai.nifi.utility.CloneTestProcessor.onTrigger(CloneTestProcessor.java:59)
       at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
       at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1119)
       at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:147)
       at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
       at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:128)
       at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
       at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
       at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
       at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
       at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
       at java.lang.Thread.run(Thread.java:745)