NIFI: how to update config data from custpm processor

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

NIFI: how to update config data from custpm processor

sally
n my custom processor i need to update config file(is placed in one of nifi
folder not original config it is created by me :D ) based on data which i
get from upstream connection but i can't get nor error neither desired
result what should i do,

1.is there any way i can controll upstream connection flowfile destination i
mean when i make debugging i saw that fileQueue.drainTo(file, batchSize); in
this part file was null
2.here is one more thing i am interested in, on this line session.commit() i
always get errors and whole operation is failed ,should i clean fileQueue
and then make session.commit()?

FlowFile flowfile; @Override public void onTrigger(ProcessContext context,
ProcessSession session) throws ProcessException { final String
conflictResponse = context.getProperty(CONFLICT_RESOLUTION).getValue();
final ArrayList value = new ArrayList<>();
    flowfile = session.get();
    if (flowfile == null) {
      return;
    }
    value.add(flowfile.getAttribute("filename"));
    session.remove(flowfile);


    final File directory = new
File(context.getProperty(DIRECTORY).evaluateAttributeExpressions().getValue());


    final boolean keepingSourceFile =
context.getProperty(KEEP_SOURCE_FILE).asBoolean();
    final ComponentLog logger = getLogger();


    if (fileQueue.size() < 100) {
        final long pollingMillis =
context.getProperty(POLLING_INTERVAL).asTimePeriod(TimeUnit.MILLISECONDS);
        if ((queueLastUpdated.get() < System.currentTimeMillis() -
pollingMillis) && listingLock.tryLock()) {
            try {
                final Set<File> listing = performListing(directory,
fileFilterRef.get(),
context.getProperty(RECURSE).asBoolean().booleanValue());


                queueLock.lock();
                try {
                    listing.removeAll(inProcess);
                    if (!keepingSourceFile) {
                        listing.removeAll(recentlyProcessed);
                    }


                    fileQueue.clear();
                    fileQueue.addAll(listing);


                    queueLastUpdated.set(System.currentTimeMillis());
                    recentlyProcessed.clear();


                    if (listing.isEmpty()) {
                        context.yield();
                    }
                } finally {
                    queueLock.unlock();
                }
            } finally {
                listingLock.unlock();
            }
        }
    }


    final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    final List<File> file = new ArrayList<>(batchSize);


    queueLock.lock();
    try {
        fileQueue.drainTo(file, batchSize);
        if (file.isEmpty()) {
            return;
        } else {
            inProcess.addAll(file);
        }
    } finally {
        queueLock.unlock();
    }


    //make  xml parsing
    DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();




    try {
        dBuilder = dbFactory.newDocumentBuilder();
    } catch (ParserConfigurationException e) {
        e.printStackTrace();
    }


    try {
        File f = file.get(0);
        doc = dBuilder.parse(f);
    } catch (IOException e) {
        e.printStackTrace();
    } catch (org.xml.sax.SAXException e) {
        e.printStackTrace();
    }
    NodeList nList = doc.getElementsByTagName("localAttributes");
    for (int temp = 0; temp < nList.getLength(); temp++) {


        Node nNode = nList.item(temp);




        if (nNode.getNodeType() == Node.ELEMENT_NODE) {


            Element eElement = (Element) nNode;




            start =
eElement.getElementsByTagName("start").item(0).getTextContent();
            startDate =
eElement.getElementsByTagName("startDate").item(0).getTextContent();
            endDate =
eElement.getElementsByTagName("endDate").item(0).getTextContent();
            patch =
eElement.getElementsByTagName("patch").item(0).getTextContent();
            runAs =
eElement.getElementsByTagName("runAs").item(0).getTextContent();
            makeVersion =
eElement.getElementsByTagName("makeVersion").item(0).getTextContent();
            ///parameter =
eElement.getElementsByTagName("parameter").item(0).getTextContent();




        }
    }
    final ListIterator<File> itr = file.listIterator();
    FlowFile flowFile1 = null;
    try {
        final Path directoryPath = directory.toPath();
        while (itr.hasNext()) {
            final File files = itr.next();
            final Path filePath = files.toPath();
            final Path relativePath =
directoryPath.relativize(filePath.getParent());
            String relativePathString = relativePath.toString() + "/";
            if (relativePathString.isEmpty()) {
                relativePathString = "./";
            }
            final Path absPath = filePath.toAbsolutePath();
            final String absPathString = absPath.getParent().toString() +
"/";


            flowFile1 = session.create();
            final long importStart = System.nanoTime();
            flowFile1 = session.importFrom(filePath, keepingSourceFile,
flowFile1);
            final long importNanos = System.nanoTime() - importStart;
            final long importMillis =
TimeUnit.MILLISECONDS.convert(importNanos, TimeUnit.NANOSECONDS);


            flowFile1 = session.putAttribute(flowFile1,
CoreAttributes.FILENAME.key(), files.getName());
            flowFile1 = session.putAttribute(flowFile1,
CoreAttributes.PATH.key(), relativePathString);
            flowFile1 = session.putAttribute(flowFile1,
CoreAttributes.ABSOLUTE_PATH.key(), absPathString);
            Map<String, String> attributes =
getAttributesFromFile(filePath);
            if (attributes.size() > 0) {
                flowFile1 = session.putAllAttributes(flowFile1, attributes);
            }


            InputStream ffStream = session.read(flowFile1);
            DocumentBuilderFactory builderFactory =
DocumentBuilderFactory.newInstance();
            DocumentBuilder builder = builderFactory.newDocumentBuilder();
            Document xmlDocument = builder.parse(ffStream);


            XPath xPath = XPathFactory.newInstance().newXPath();
            XPathExpression myNodeList = (XPathExpression)
xPath.compile("/localAttributes");
            Node nodeGettingChanged = (Node)
myNodeList.evaluate(xmlDocument, XPathConstants.NODE);
            NodeList childNodes = nodeGettingChanged.getChildNodes();
            boolean  make=false;
            for (int i = 0; i != childNodes.getLength(); ++i) {
                Node child = childNodes.item(i);
                if (!(child instanceof Element))
                    continue;
                if(child.getNodeName().equals("start")){
                    String date;
                    for(int  j=0;j<value.size();j++) {
                        if(value.get(j).length()>10){
                            date=value.get(j).substring(0,10);
                        }
                        else{
                            date=value.get(j);
                        }
                        if (date == child.getFirstChild().getTextContent()){
                           
child.getFirstChild().setNodeValue(addOneDay(child.getFirstChild().getTextContent()));
                            make=true;


                        }
                    }
                }
                if(make){
                    if(child.getNodeName().equals("runAs")){
                        child.getFirstChild().setNodeValue("true");
                    }
                }






            }
            TransformerFactory transformerFactory =
TransformerFactory.newInstance();
            Transformer transformer = null;
            transformer = transformerFactory.newTransformer();
            DOMSource source = new DOMSource(xmlDocument);
            String path =
"C:/Users/user/Desktop/nifi-1.3.0/nifi-assembly/target/nifi-1.3.0-bin/nifi-1.3.0/1/conf.xml";
            File f = new File(path);
            StreamResult file1 = new StreamResult(f);
            try {
                transformer.transform(source, file1);
            } catch (TransformerException e) {
                e.printStackTrace();
            }




            session.write(flowFile1, new StreamCallback() {
                @Override
                public void process(InputStream inputStream, OutputStream
outputStream) throws IOException {
                    TransformerFactory transformerFactory =
TransformerFactory.newInstance();
                    Transformer transformer = null;
                    try {
                        transformer = transformerFactory.newTransformer();
                    } catch (TransformerConfigurationException e) {
                        e.printStackTrace();
                    }
                    DOMSource source = new DOMSource(xmlDocument);
                    ffStream.close();
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    StreamResult result = new StreamResult(bos);




                    try {
                        transformer.transform(source, result);
                    } catch (TransformerException e) {
                        e.printStackTrace();
                    }
                    byte[] array = bos.toByteArray();
                    outputStream.write(array);
                }


            });


            Path tempDotCopyFile = null;
            try {
                final Path rootDirPath =
Paths.get("C://Users//s.tkhilaishvili//Desktop//try2//nifi-1.3.0//1");
                final Path tempCopyFile = rootDirPath.resolve("." +
flowFile1.getAttribute(CoreAttributes.FILENAME.key()));
                final Path copyFile =
rootDirPath.resolve(flowFile1.getAttribute(CoreAttributes.FILENAME.key()));


                if (!Files.exists(rootDirPath)) {
                    if (context.getProperty(CREATE_DIRS).asBoolean()) {
                        Files.createDirectories(rootDirPath);
                    } else {
                        flowFile1 = session.penalize(flowFile1);
                        session.transfer(flowFile1,REL_FAILURE);
                        logger.error("Penalizing {} and routing to 'failure'
because the output directory {} does not exist and Processor is "
                                + "configured not to create missing
directories", new Object[]{flowFile1, rootDirPath});
                        return;
                    }
                }


                final Path dotCopyFile = tempCopyFile;
                tempDotCopyFile = dotCopyFile;
                Path finalCopyFile = copyFile;


                final Path finalCopyFileDir = finalCopyFile.getParent();
                if (Files.exists(finalCopyFileDir)) { // check if too many
files already
                    final int numFiles =
finalCopyFileDir.toFile().list().length;


                    if (numFiles >= 34) {
                        flowFile1 = session.penalize(flowFile1);
                        logger.warn("Penalizing {} and routing to 'failure'
because the output directory {} has {} files, which exceeds the "
                                + "configured maximum number of files", new
Object[]{flowFile1, finalCopyFileDir, numFiles});
                        session.transfer(flowFile1,REL_FAILURE);
                        return;
                    }
                }


                if (Files.exists(finalCopyFile)) {
                    switch (conflictResponse) {
                        case REPLACE_RESOLUTION:
                            Files.delete(finalCopyFile);
                            logger.info("Deleted {} as configured in order
to replace with the contents of {}", new Object[]{finalCopyFile,
flowFile1});
                            break;
                        case IGNORE_RESOLUTION:
                            session.transfer(flowFile1, REL_SUCCESS);
                            logger.info("Transferring {} to success because
file with same name already exists", new Object[]{flowFile1});
                            return;
                        case FAIL_RESOLUTION:
                            flowFile1 = session.penalize(flowFile1);
                            logger.warn("Penalizing {} and routing to
failure as configured because file with the same name already exists", new
Object[]{flowFile1});
                            session.transfer(flowFile1,REL_FAILURE);
                            return;
                        default:
                            break;
                    }
                }


                session.exportTo(flowFile1, dotCopyFile, false);


                final String permissions = "-rwxrwx---";
                if (permissions != null && !permissions.trim().isEmpty()) {
                    try {
                        String perms = stringPermissions(permissions);
                        if (!perms.isEmpty()) {
                            Files.setPosixFilePermissions(dotCopyFile,
PosixFilePermissions.fromString(perms));
                        }
                    } catch (Exception e) {
                        logger.warn("Could not set file permissions to {}
because {}", new Object[]{permissions, e});
                    }
                }










                boolean renamed = false;
                for (int i = 0; i < 10; i++) { // try rename up to 10 times.
                    if
(dotCopyFile.toFile().renameTo(finalCopyFile.toFile())) {
                        renamed = true;
                        break;// rename was successful
                    }
                    Thread.sleep(100L);// try waiting a few ms to let
whatever might cause rename failure to resolve
                }


                if (!renamed) {
                    if (Files.exists(dotCopyFile) &&
dotCopyFile.toFile().delete()) {
                        logger.debug("Deleted dot copy file {}", new
Object[]{dotCopyFile});
                    }
                    throw new ProcessException("Could not rename: " +
dotCopyFile);
                } else {
                    logger.info("Produced copy of {} at location {}", new
Object[]{flowFile1, finalCopyFile});
                }


                /*session.getProvenanceReporter().send(flowFile,
finalCopyFile.toFile().toURI().toString(),
stopWatch.getElapsed(TimeUnit.MILLISECONDS));
                session.transfer(flowFile, REL_SUCCESS);*/
                session.getProvenanceReporter().receive(flowFile1,
files.toURI().toString(), importMillis);
                //session.transfer(flowFile1, REL_SUCCESS);
                session.remove(flowFile1);
            } catch (final Throwable t) {
                if (tempDotCopyFile != null) {
                    try {
                        Files.deleteIfExists(tempDotCopyFile);
                    } catch (final Exception e) {
                        logger.error("Unable to remove temporary file {} due
to {}", new Object[]{tempDotCopyFile, e});
                    }
                }


                flowFile1 = session.penalize(flowFile1);
                logger.error("Penalizing {} and transferring to failure due
to {}", new Object[]{flowFile1, t});
                session.transfer(flowFile1,REL_FAILURE);
            }
        }








        if (!isScheduled()) {  // if processor stopped, put the rest of the
files back on the queue.
            queueLock.lock();
            try {
                while (itr.hasNext()) {
                    final File nextFile = itr.next();
                    fileQueue.add(nextFile);
                    inProcess.remove(nextFile);
                }
            } finally {
                queueLock.unlock();
            }
        }
    } catch (IOException e1) {
        e1.printStackTrace();
    } catch (TransformerConfigurationException e1) {
        e1.printStackTrace();
    } catch (ParserConfigurationException e1) {
        e1.printStackTrace();
    } catch (XPathExpressionException e1) {
        e1.printStackTrace();
    } catch (org.xml.sax.SAXException e) {
        e.printStackTrace();
    }
    session.commit();
}



--
Sent from: http://apache-nifi-developer-list.39713.n7.nabble.com/