Nifi:how to handle flowfile data updateing and reitreving?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Nifi:how to handle flowfile data updateing and reitreving?

sally

0
down vote
favorite
I want to reitreve data from folder and then update it and put it back to
its' destination and i want to make this operation many times , here is my
code i try to getfile and update it , but it doesn't roll back the file and
can't reitreve data with the same filename(sometimes when i start this
processor first time it reireives data and then rolls back updated one but
then it remembers state or flowfile information i guess and doen't reitreive
same updated file) , can someone help me what should i change to make this
code work?

here is error i got :2017-10-08 21:40:55,959 ERROR [Timer-Driven Process
Thread-9] Reader.MyProcessor
MyProcessor[id=fcaf839f-015e-1000-da5d-a3256b960a67]
MyProcessor[id=fcaf839f-015e-1000-da5d-a3256b960a67] failed to process due
to java.lang.IllegalArgumentException: Cannot transfer FlowFiles that are
created in this Session back to self; rolling back session: {}
java.lang.IllegalArgumentException: Cannot transfer FlowFiles that are
created in this Session back to self at
org.apache.nifi.controller.repository.StandardProcessSession.transfer(StandardProcessSession.java:1848)
at Reader.MyProcessor.onTrigger(MyProcessor.java:732)



public class MyProcessor extends AbstractProcessor {

    public String start, startDate, endDate, makeVersion, runAs, patch;

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
        final StopWatch stopWatch = new StopWatch(true);
        final File directory = new
File(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());
        final boolean keepingSourceFile =
context.getProperty(KEEP_SOURCE_FILE).asBoolean();
        final String conflictResponse =
context.getProperty(CONFLICT_RESOLUTION).getValue();
        final Integer maxDestinationFiles = 30;
        final ComponentLog logger = getLogger();

        if (fileQueue.size() < 100) {
            final long pollingMillis =
context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
            if ((queueLastUpdated.get() < System.currentTimeMillis() -
pollingMillis) && listingLock.tryLock()) {
                try {
                    final ArrayList<File> listing =
performListing(directory, fileFilterRef.get(),
context.getProperty(RECURSE).asBoolean().booleanValue());

                    queueLock.lock();
                    try {
                        listing.removeAll(inProcess);
                        if (!keepingSourceFile) {
                            listing.removeAll(recentlyProcessed);
                        }

                        fileQueue.clear();
                        fileQueue.addAll(listing);

                        queueLastUpdated.set(System.currentTimeMillis());
                        recentlyProcessed.clear();

                        if (listing.isEmpty()) {
                            context.yield();
                        }
                    } finally {
                        queueLock.unlock();
                    }
                } finally {
                    listingLock.unlock();
                }
            }
        }

        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
        final List<File> files = new ArrayList<>(batchSize);
        queueLock.lock();
        try {
            fileQueue.drainTo(files, batchSize);
            if (files.isEmpty()) {
                return;
            } else {
                inProcess.addAll(files);
            }
        } finally {
            queueLock.unlock();
        }
        //make  xml parsing
        DocumentBuilderFactory dbFactory =
DocumentBuilderFactory.newInstance();


        try {
            dBuilder = dbFactory.newDocumentBuilder();
        } catch (ParserConfigurationException e) {
            e.printStackTrace();
        }

        try {
            File f = files.get(0);
            doc = dBuilder.parse(f);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (org.xml.sax.SAXException e) {
            e.printStackTrace();
        }
        NodeList nList = doc.getElementsByTagName("localAttributes");
        for (int temp = 0; temp < nList.getLength(); temp++) {

            Node nNode = nList.item(temp);


            if (nNode.getNodeType() == Node.ELEMENT_NODE) {

                Element eElement = (Element) nNode;


                start =
eElement.getElementsByTagName("start").item(0).getTextContent();
                startDate =
eElement.getElementsByTagName("startDate").item(0).getTextContent();
                endDate =
eElement.getElementsByTagName("endDate").item(0).getTextContent();
                patch =
eElement.getElementsByTagName("patch").item(0).getTextContent();
                runAs =
eElement.getElementsByTagName("runAs").item(0).getTextContent();
                makeVersion =
eElement.getElementsByTagName("makeVersion").item(0).getTextContent();

            }
        }

        final ListIterator<File> itr = files.listIterator();
        FlowFile flowFile = null;
        try {
            final Path directoryPath = directory.toPath();
            while (itr.hasNext()) {
                final File file = itr.next();
                final Path filePath = file.toPath();
                final Path relativePath =
directoryPath.relativize(filePath.getParent());
                String relativePathString = relativePath.toString() + "/";
                if (relativePathString.isEmpty()) {
                    relativePathString = "./";
                }
                final Path absPath = filePath.toAbsolutePath();
                final String absPathString = absPath.getParent().toString()
+ "/";

                flowFile = session.create();
                final long importStart = System.nanoTime();
                flowFile = session.importFrom(filePath, keepingSourceFile,
flowFile);
                final long importNanos = System.nanoTime() - importStart;
                final long importMillis =
TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);

                flowFile = session.putAttribute(flowFile,
CoreAttributes.FILENAME.key(), file.getName());
                flowFile = session.putAttribute(flowFile,
CoreAttributes.PATH.key(), relativePathString);
                flowFile = session.putAttribute(flowFile,
CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
                Map<String, String> attributes =
getAttributesFromFile(filePath);
                if (attributes.size() > 0) {
                    flowFile = session.putAllAttributes(flowFile,
attributes);
                }
                FlowFile flowFile1 = session.create();
                flowFile1 = session.putAttribute(flowFile1,
CoreAttributes.FILENAME.key(), file.getName());
                flowFile1 = session.putAttribute(flowFile1,
CoreAttributes.PATH.key(), relativePathString);
                flowFile1 = session.putAttribute(flowFile1,
CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
                flowFile1 = session.putAttribute(flowFile1, "start", start);
                flowFile1 = session.putAttribute(flowFile1, "startDate",
startDate);
                flowFile1 = session.putAttribute(flowFile1, "endDate",
endDate);
                flowFile1 = session.putAttribute(flowFile1, "runAs", runAs);
                flowFile1 = session.putAttribute(flowFile1, "patch", patch);
                flowFile1 = session.putAttribute(flowFile1, "makeVersion",
makeVersion);

                flowFile1 = session.putAttribute(flowFile1, "filename",
"Configuration");
                //session.getProvenanceReporter().receive(flowFile1,
file.toURI().toString(), importMillis);


                InputStream ffStream = session.read(flowFile);
                DocumentBuilderFactory builderFactory =
DocumentBuilderFactory.newInstance();
                DocumentBuilder builder =
builderFactory.newDocumentBuilder();
                Document xmlDocument = builder.parse(ffStream);

                XPath xPath = XPathFactory.newInstance().newXPath();
                XPathExpression myNodeList = (XPathExpression)
xPath.compile("/localAttributes");
                Node nodeGettingChanged = (Node)
myNodeList.evaluate(xmlDocument, XPathConstants.NODE);
                NodeList childNodes = nodeGettingChanged.getChildNodes();
                for (int i = 0; i != childNodes.getLength(); ++i) {
                    Node child = childNodes.item(i);
                    if (!(child instanceof Element))
                        continue;

                    if (child.getNodeName().equals("runAs"))
                        child.getFirstChild().setNodeValue("false");

                }
                TransformerFactory transformerFactory =
TransformerFactory.newInstance();
                Transformer transformer = null;
                transformer = transformerFactory.newTransformer();
                DOMSource source = new DOMSource(xmlDocument);
                String path =
"C://Users//user//Desktop//nifi-1.3.0//nifi-assembly//target//nifi-1.3.0-bin//nifi-1.3.0//1//conf.xml";
                File f = new File(path);
                StreamResult file1 = new StreamResult(f);
                try {
                    transformer.transform(source, file1);
                } catch (TransformerException e) {
                    e.printStackTrace();
                }


                session.write(flowFile, new StreamCallback() {
                    @Override
                    public void process(InputStream inputStream,
OutputStream outputStream) throws IOException {
                        TransformerFactory transformerFactory =
TransformerFactory.newInstance();
                        Transformer transformer = null;
                        try {
                            transformer =
transformerFactory.newTransformer();
                        } catch (TransformerConfigurationException e) {
                            e.printStackTrace();
                        }
                        DOMSource source = new DOMSource(xmlDocument);
                        ffStream.close();
                        ByteArrayOutputStream bos = new
ByteArrayOutputStream();
                        StreamResult result = new StreamResult(bos);


                        try {
                            transformer.transform(source, result);
                        } catch (TransformerException e) {
                            e.printStackTrace();
                        }
                        byte[] array = bos.toByteArray();
                        outputStream.write(array);
                    }

                });






                Path tempDotCopyFile = null;
                try {
                    final Path rootDirPath =
Paths.get("C://Users//user//Desktop//nifi-1.3.0//nifi-assembly//target//nifi-1.3.0-bin//nifi-1.3.0//1");
                    final Path tempCopyFile = rootDirPath.resolve("." +
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
                    final Path copyFile =
rootDirPath.resolve(flowFile.getAttribute(CoreAttributes.FILENAME.key()));

                    if (!Files.exists(rootDirPath)) {
                        if (context.getProperty(CREATE_DIRS).asBoolean()) {
                            Files.createDirectories(rootDirPath);
                        } else {
                            flowFile = session.penalize(flowFile);
                            session.transfer(flowFile);
                            logger.error("Penalizing {} and routing to
'failure' because the output directory {} does not exist and Processor is "
                                    + "configured not to create missing
directories", new Object[]{flowFile1, rootDirPath});
                            return;
                        }
                    }

                    final Path dotCopyFile = tempCopyFile;
                    tempDotCopyFile = dotCopyFile;
                    Path finalCopyFile = copyFile;

                    final Path finalCopyFileDir = finalCopyFile.getParent();
                    if (Files.exists(finalCopyFileDir) &&
maxDestinationFiles != null) { // check if too many files already
                        final int numFiles =
finalCopyFileDir.toFile().list().length;

                        if (numFiles >= maxDestinationFiles) {
                            flowFile= session.penalize(flowFile);
                            logger.warn("Penalizing {} and routing to
'failure' because the output directory {} has {} files, which exceeds the "
                                    + "configured maximum number of files",
new Object[]{flowFile, finalCopyFileDir, numFiles});
                            session.transfer(flowFile);
                            return;
                        }
                    }

                    if (Files.exists(finalCopyFile)) {
                        switch (conflictResponse) {
                            case REPLACE_RESOLUTION:
                                Files.delete(finalCopyFile);
                                logger.info("Deleted {} as configured in
order to replace with the contents of {}", new Object[]{finalCopyFile,
flowFile});
                                break;
                            case IGNORE_RESOLUTION:
                                session.transfer(flowFile, REL_SUCCESS);
                                logger.info("Transferring {} to success
because file with same name already exists", new Object[]{flowFile});
                                return;
                            case FAIL_RESOLUTION:
                                flowFile = session.penalize(flowFile);
                                logger.warn("Penalizing {} and routing to
failure as configured because file with the same name already exists", new
Object[]{flowFile});
                                session.transfer(flowFile);
                                return;
                            default:
                                break;
                        }
                    }

                    session.exportTo(flowFile, dotCopyFile, false);

                    final String permissions =
context.getProperty(CHANGE_PERMISSIONS).evaluateAttributeExpressions(flowFile1).getValue();
                    if (permissions != null &&
!permissions.trim().isEmpty()) {
                        try {
                            String perms = stringPermissions(permissions);
                            if (!perms.isEmpty()) {
                                Files.setPosixFilePermissions(dotCopyFile,
PosixFilePermissions.fromString(perms));
                            }
                        } catch (Exception e) {
                            logger.warn("Could not set file permissions to
{} because {}", new Object[]{permissions, e});
                        }
                    }

                    final String owner =
context.getProperty(CHANGE_OWNER).evaluateAttributeExpressions(flowFile1).getValue();
                    if (owner != null && !owner.trim().isEmpty()) {
                        try {
                            UserPrincipalLookupService lookupService =
dotCopyFile.getFileSystem().getUserPrincipalLookupService();
                            Files.setOwner(dotCopyFile,
lookupService.lookupPrincipalByName(owner));
                        } catch (Exception e) {
                            logger.warn("Could not set file owner to {}
because {}", new Object[]{owner, e});
                        }
                    }

                    final String group =
context.getProperty(CHANGE_GROUP).evaluateAttributeExpressions(flowFile1).getValue();
                    if (group != null && !group.trim().isEmpty()) {
                        try {
                            UserPrincipalLookupService lookupService =
dotCopyFile.getFileSystem().getUserPrincipalLookupService();
                            PosixFileAttributeView view =
Files.getFileAttributeView(dotCopyFile, PosixFileAttributeView.class);
                           
view.setGroup(lookupService.lookupPrincipalByGroupName(group));
                        } catch (Exception e) {
                            logger.warn("Could not set file group to {}
because {}", new Object[]{group, e});
                        }
                    }

                    boolean renamed = false;
                    for (int i = 0; i < 10; i++) { // try rename up to 10
times.
                        if
(dotCopyFile.toFile().renameTo(finalCopyFile.toFile())) {
                            renamed = true;
                            break;// rename was successful
                        }
                        Thread.sleep(100L);// try waiting a few ms to let
whatever might cause rename failure to resolve
                    }

                    if (!renamed) {
                        if (Files.exists(dotCopyFile) &&
dotCopyFile.toFile().delete()) {
                            logger.debug("Deleted dot copy file {}", new
Object[]{dotCopyFile});
                        }
                        throw new ProcessException("Could not rename: " +
dotCopyFile);
                    } else {
                        logger.info("Produced copy of {} at location {}",
new Object[]{flowFile1, finalCopyFile});
                    }

                    /*session.getProvenanceReporter().send(flowFile,
finalCopyFile.toFile().toURI().toString(),
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                    session.transfer(flowFile, REL_SUCCESS);*/
                    session.getProvenanceReporter().receive(flowFile1,
file.toURI().toString(), importMillis);
                    session.transfer(flowFile1, REL_SUCCESS);
                    session.remove(flowFile);
                } catch (final Throwable t) {
                    if (tempDotCopyFile != null) {
                        try {
                            Files.deleteIfExists(tempDotCopyFile);
                        } catch (final Exception e) {
                            logger.error("Unable to remove temporary file {}
due to {}", new Object[]{tempDotCopyFile, e});
                        }
                    }

                    flowFile = session.penalize(flowFile1);
                    logger.error("Penalizing {} and transferring to failure
due to {}", new Object[]{flowFile1, t});
                    session.transfer(flowFile1);
                }
            }




            if (!isScheduled()) {  // if processor stopped, put the rest of
the files back on the queue.
                queueLock.lock();
                try {
                    while (itr.hasNext()) {
                        final File nextFile = itr.next();
                        fileQueue.add(nextFile);
                        inProcess.remove(nextFile);
                    }
                } finally {
                    queueLock.unlock();
                }
            }
        } catch (IOException e1) {
            e1.printStackTrace();
        } catch (TransformerConfigurationException e1) {
            e1.printStackTrace();
        } catch (ParserConfigurationException e1) {
            e1.printStackTrace();
        } catch (SAXException e1) {
            e1.printStackTrace();
        } catch (XPathExpressionException e1) {
            e1.printStackTrace();
        }
        session.commit();
    }


    protected String stringPermissions(String perms) {
        String permissions = "";
        final Pattern rwxPattern = Pattern.compile("^[rwx-]{9}$");
        final Pattern numPattern = Pattern.compile("\\d+");
        if (rwxPattern.matcher(perms).matches()) {
            permissions = perms;
        } else if (numPattern.matcher(perms).matches()) {
            try {
                int number = Integer.parseInt(perms, 8);
                StringBuilder permBuilder = new StringBuilder();
                if ((number & 0x100) > 0) {
                    permBuilder.append('r');
                } else {
                    permBuilder.append('-');
                }
                if ((number & 0x80) > 0) {
                    permBuilder.append('w');
                } else {
                    permBuilder.append('-');
                }
                if ((number & 0x40) > 0) {
                    permBuilder.append('x');
                } else {
                    permBuilder.append('-');
                }
                if ((number & 0x20) > 0) {
                    permBuilder.append('r');
                } else {
                    permBuilder.append('-');
                }
                if ((number & 0x10) > 0) {
                    permBuilder.append('w');
                } else {
                    permBuilder.append('-');
                }
                if ((number & 0x8) > 0) {
                    permBuilder.append('x');
                } else {
                    permBuilder.append('-');
                }
                if ((number & 0x4) > 0) {
                    permBuilder.append('r');
                } else {
                    permBuilder.append('-');
                }
                if ((number & 0x2) > 0) {
                    permBuilder.append('w');
                } else {
                    permBuilder.append('-');
                }
                if ((number & 0x8) > 0) {
                    permBuilder.append('x');
                } else {
                    permBuilder.append('-');
                }
                permissions = permBuilder.toString();
            } catch (NumberFormatException ignore) {
            }
        }
        return permissions;
    }

}



--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/