is not the most recent version of this FlowFile within this session

classic Classic list List threaded Threaded
4 messages Options
plj
Reply | Threaded
Open this post in threaded view
|

is not the most recent version of this FlowFile within this session

plj
Howdy,

  I'm trying to send JSON to a flow file and I get the error "is not the most recent version of this FlowFile within this session".  I'm not sure what I'm doing wrong.    Here is a snippet
                        FlowFile split = processSession.create(original);
                        split = processSession.write(split, new OutputStreamCallback() {
                            @Override
                            public void process(OutputStream out) throws IOException {
                                String content = baos.toString("UTF-8");
                                String json = toJSON(content);
                                ObjectMapper mapper = new ObjectMapper();
                                JsonNode actualObj = mapper.readTree(json);
                                logger.info("JSON METADATA \n" + mapper.defaultPrettyPrintingWriter().writeValueAsString(actualObj));
                                out.write(mapper.defaultPrettyPrintingWriter().writeValueAsString(actualObj).getBytes(StandardCharsets.UTF_8));    
                            }
                        });


The json is written out to the log

In onTrigger(...) it set "meta" to the flow file "split" above

        processSession.transfer(meta, REL_METADATA);

And it get this error

2015-09-16 16:36:48,482 ERROR [Timer-Driven Process Thread-9] org.mitre.nifi.NiFiNITFReader NiFiNITFReader[id=a615ceb9-fa67-4823-9eb0-310ebada5488] NiFiNITFReader[id=a615ceb9-fa67-4823-9eb0-310ebada5488] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=865d7aac-994c-41ac-a34c-599d92df31b9,claim=1442435806141-1,offset=0,name=i_3001a.ntf,size=1049479] is not the most recent version of this FlowFile within this session (StandardProcessSession[id=2]); rolling back session: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=865d7aac-994c-41ac-a34c-599d92df31b9,claim=1442435806141-1,offset=0,name=i_3001a.ntf,size=1049479] is not the most recent version of this FlowFile within this session (StandardProcessSession[id=2])



thoughts?

thank you
Reply | Threaded
Open this post in threaded view
|

Re: is not the most recent version of this FlowFile within this session

Michael Kobit
It is hard to tell without the rest of your code, but what you have shown
and the way you are explaining it doesn't seem like this error would happen.

I've seen this problem when you do a processSession.write(flowfile,
outputStream -> {}) and you don't capture the returned `flowFile`. Are you
writing to a flowFile anywhere else and not using the returned value?

On Wed, Sep 16, 2015 at 5:22 PM plj <[hidden email]> wrote:

> Howdy,
>
>   I'm trying to send JSON to a flow file and I get the error "is not the
> most recent version of this FlowFile within this session".  I'm not sure
> what I'm doing wrong.    Here is a snippet
>                         FlowFile split = processSession.create(original);
>                         split = processSession.write(split, new
> OutputStreamCallback() {
>                             @Override
>                             public void process(OutputStream out) throws
> IOException {
>                                 String content = baos.toString("UTF-8");
>                                 String json = toJSON(content);
>                                 ObjectMapper mapper = new ObjectMapper();
>                                 JsonNode actualObj = mapper.readTree(json);
>                                 logger.info("JSON METADATA \n" +
> mapper.defaultPrettyPrintingWriter().writeValueAsString(actualObj));
>
>
> out.write(mapper.defaultPrettyPrintingWriter().writeValueAsString(actualObj).getBytes(StandardCharsets.UTF_8));
>                             }
>                         });
>
>
> The json is written out to the log
>
> In onTrigger(...) it set "meta" to the flow file "split" above
>
>         processSession.transfer(meta, REL_METADATA);
>
> And it get this error
>
> 2015-09-16 16:36:48,482 ERROR [Timer-Driven Process Thread-9]
> org.mitre.nifi.NiFiNITFReader
> NiFiNITFReader[id=a615ceb9-fa67-4823-9eb0-310ebada5488]
> NiFiNITFReader[id=a615ceb9-fa67-4823-9eb0-310ebada5488] failed to process
> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>
> StandardFlowFileRecord[uuid=865d7aac-994c-41ac-a34c-599d92df31b9,claim=1442435806141-1,offset=0,name=i_3001a.ntf,size=1049479]
> is not the most recent version of this FlowFile within this session
> (StandardProcessSession[id=2]); rolling back session:
> org.apache.nifi.processor.exception.FlowFileHandlingException:
>
> StandardFlowFileRecord[uuid=865d7aac-994c-41ac-a34c-599d92df31b9,claim=1442435806141-1,offset=0,name=i_3001a.ntf,size=1049479]
> is not the most recent version of this FlowFile within this session
> (StandardProcessSession[id=2])
>
>
>
> thoughts?
>
> thank you
>
>
>
>
> --
> View this message in context:
> http://apache-nifi-developer-list.39713.n7.nabble.com/is-not-the-most-recent-version-of-this-FlowFile-within-this-session-tp2837.html
> Sent from the Apache NiFi Developer List mailing list archive at
> Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: is not the most recent version of this FlowFile within this session

Mark Payne
Michael, I think you have the premise exactly right. It's not just when you call session.write, though. It's for any call
that you make to update a FlowFile. For example, if you call session.putAttribute or session.putAllAttributes, those methods
also will return a new FlowFile object that you must capture / use.



> On Sep 16, 2015, at 8:07 PM, Michael Kobit <[hidden email]> wrote:
>
> It is hard to tell without the rest of your code, but what you have shown
> and the way you are explaining it doesn't seem like this error would happen.
>
> I've seen this problem when you do a processSession.write(flowfile,
> outputStream -> {}) and you don't capture the returned `flowFile`. Are you
> writing to a flowFile anywhere else and not using the returned value?
>
> On Wed, Sep 16, 2015 at 5:22 PM plj <[hidden email]> wrote:
>
>> Howdy,
>>
>>  I'm trying to send JSON to a flow file and I get the error "is not the
>> most recent version of this FlowFile within this session".  I'm not sure
>> what I'm doing wrong.    Here is a snippet
>>                        FlowFile split = processSession.create(original);
>>                        split = processSession.write(split, new
>> OutputStreamCallback() {
>>                            @Override
>>                            public void process(OutputStream out) throws
>> IOException {
>>                                String content = baos.toString("UTF-8");
>>                                String json = toJSON(content);
>>                                ObjectMapper mapper = new ObjectMapper();
>>                                JsonNode actualObj = mapper.readTree(json);
>>                                logger.info("JSON METADATA \n" +
>> mapper.defaultPrettyPrintingWriter().writeValueAsString(actualObj));
>>
>>
>> out.write(mapper.defaultPrettyPrintingWriter().writeValueAsString(actualObj).getBytes(StandardCharsets.UTF_8));
>>                            }
>>                        });
>>
>>
>> The json is written out to the log
>>
>> In onTrigger(...) it set "meta" to the flow file "split" above
>>
>>        processSession.transfer(meta, REL_METADATA);
>>
>> And it get this error
>>
>> 2015-09-16 16:36:48,482 ERROR [Timer-Driven Process Thread-9]
>> org.mitre.nifi.NiFiNITFReader
>> NiFiNITFReader[id=a615ceb9-fa67-4823-9eb0-310ebada5488]
>> NiFiNITFReader[id=a615ceb9-fa67-4823-9eb0-310ebada5488] failed to process
>> due to org.apache.nifi.processor.exception.FlowFileHandlingException:
>>
>> StandardFlowFileRecord[uuid=865d7aac-994c-41ac-a34c-599d92df31b9,claim=1442435806141-1,offset=0,name=i_3001a.ntf,size=1049479]
>> is not the most recent version of this FlowFile within this session
>> (StandardProcessSession[id=2]); rolling back session:
>> org.apache.nifi.processor.exception.FlowFileHandlingException:
>>
>> StandardFlowFileRecord[uuid=865d7aac-994c-41ac-a34c-599d92df31b9,claim=1442435806141-1,offset=0,name=i_3001a.ntf,size=1049479]
>> is not the most recent version of this FlowFile within this session
>> (StandardProcessSession[id=2])
>>
>>
>>
>> thoughts?
>>
>> thank you
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-nifi-developer-list.39713.n7.nabble.com/is-not-the-most-recent-version-of-this-FlowFile-within-this-session-tp2837.html
>> Sent from the Apache NiFi Developer List mailing list archive at
>> Nabble.com.
>>

plj
Reply | Threaded
Open this post in threaded view
|

Re: is not the most recent version of this FlowFile within this session

plj
In reply to this post by plj
Thank you for your help.  I was indeed doing other things with flow files.  I’ve gotten rid of that error but I’m now getting another error.  

transfer relationship not specified

2015-09-17 13:34:53,995 ERROR [Timer-Driven Process Thread-9] org.mitre.nifi.NiFiNITFReader NiFiNITFReader[id=a615ceb9-fa67-4823-9eb0-310ebada5488] NiFiNITFReader[id=a615ceb9-fa67-4823-9eb0-310ebada5488] failed to process due to org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=d6e9a5bb-44f9-4e62-ae62-d41eef4635d4,claim=1442511293801-1,offset=0,name=i_3001a.ntf,size=1049479] transfer relationship not specified; rolling back session: org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=d6e9a5bb-44f9-4e62-ae62-d41eef4635d4,claim=1442511293801-1,offset=0,name=i_3001a.ntf,size=1049479] transfer relationship not specified

I’ve cut my program down to just a test case.  I don’t see what I’m doing wrong

My code is now:


    public static final Relationship REL_METADATA = new Relationship.Builder()
            .name("metadata")
            .description("All metadata of the original FlowFile will be routed to this relationship")
            .build();

    public static final Relationship REL_FAILURE = new Relationship.Builder()
            .name("failure")
            .description("If a FlowFile fails processing for any reason  it will be routed to this relationship")
            .build();
    private List<PropertyDescriptor> properties;
    private Set<Relationship> relationships;
    private static Stack<String> stack = new Stack<String>();
    private FlowFile meta = null;


    @Override
    protected void init(final ProcessorInitializationContext context) {
        // relationships
        final Set<Relationship> procRels = new HashSet<Relationship>();
//        procRels.add(REL_SUCCESS);
        procRels.add(REL_FAILURE);
        procRels.add(REL_METADATA);
        relationships = Collections.unmodifiableSet(procRels);
        final List<PropertyDescriptor> properties = new ArrayList<>();
        this.properties = Collections.unmodifiableList(properties);
    }


    @Override
    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @Override
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return properties;
    }

    @Override
    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        final ProcessorLog logger = getLogger();
        logger.info("IN onTRIGGER");
        final FlowFile original = processSession.get();
        if (original == null) {
            return;
        }
        try {
                meta = processSession.create(original);
                meta = processSession.write(meta, new OutputStreamCallback() {
                    @Override
                    public void process(OutputStream out) throws IOException {
                        String content = "THIS IS META DATA ";
                        out.write(content.getBytes(StandardCharsets.UTF_8));     // just for testing
                    }
                });
        processSession.transfer(meta, REL_METADATA);
        }catch (Exception e){
            e.printStackTrace();
            processSession.transfer(original, REL_FAILURE);
        }
    }



Any thoughts?

thank you