NIFI Unit Test Error with @OnScheduled annotation

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

NIFI Unit Test Error with @OnScheduled annotation

Sandish Kumar HN
Hi,


*I'm stuck with below error:*


java.lang.AssertionError: Could not invoke methods annotated with
@OnScheduled annotation due to: java.lang.reflect.InvocationTargetException

at
org.apache.nifi.kudu.TestPutKuduT.testWriteKuduWithDefaults(TestPutKuduT.java:70)


[INFO]

[INFO] Results:

[INFO]

[ERROR] Failures:

[ERROR]   TestPutKuduT.testWriteKuduWithDefaults:70 Could not invoke
methods annotated with @OnScheduled annotation due to:
java.lang.reflect.InvocationTargetException


*The test case is here :*

@Test
public void testWriteKuduWithDefaults() throws InitializationException {
    final TestRunner runner = TestRunners.newTestRunner(PutKudu.class);
    setUpTestRunner(runner);

    final MockKuduClientService kuduClient = getKuduClientService(runner);
    createRecordReader(100, runner);

    final String filename = "testWriteKudu-" + System.currentTimeMillis();
    final Map<String,String> flowFileAttributes = new HashMap<>();
    flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
    runner.enqueue("trigger", flowFileAttributes);
    runner.run();
    runner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS);
    // verify the successful flow file has the expected content & attributes
    final MockFlowFile mockFlowFile =
runner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
    mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename);
    mockFlowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "100");
    mockFlowFile.assertContentEquals("trigger");
}

*and onSchedule meMethod is here:*

@OnScheduled

public void onScheduled(final ProcessContext context) throws
IOException, LoginException {

    final String tableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
    operationType =
OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
    batchSize =
context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
    ffbatch   =
context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
    flushMode =
SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
    getLogger().debug("Setting up Kudu connection...");
    clientService =
context.getProperty(KUDU_CLIENT_SERVICE).asControllerService(KuduClientService.class);
    kuduTable = clientService.getKuduClient().openTable(tableName);
    getLogger().debug("Kudu connection successfully initialized");
}

*Can someone one help me to bypass this issue? I tried all optiones
dicussed here
http://apache-nifi-developer-list.39713.n7.nabble.com/Error-handling-in-OnScheduled-td19446.html
<http://apache-nifi-developer-list.39713.n7.nabble.com/Error-handling-in-OnScheduled-td19446.html>
*
Reply | Threaded
Open this post in threaded view
|

Re: NIFI Unit Test Error with @OnScheduled annotation

Phillip Grenier
Kumar,

I would validate you included the nifi-mock in your pom.xml.

For more information on testing review the developer guide
<https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#testing>
or this post
<https://www.nifi.rocks/developing-a-custom-apache-nifi-processor-unit-tests-partI/>
.

Hope that helps,

Phillip
https://nifi.rocks

On Tue, Feb 26, 2019 at 6:04 PM Sandish Kumar HN <[hidden email]>
wrote:

> Hi,
>
>
> *I'm stuck with below error:*
>
>
> java.lang.AssertionError: Could not invoke methods annotated with
> @OnScheduled annotation due to: java.lang.reflect.InvocationTargetException
>
> at
>
> org.apache.nifi.kudu.TestPutKuduT.testWriteKuduWithDefaults(TestPutKuduT.java:70)
>
>
> [INFO]
>
> [INFO] Results:
>
> [INFO]
>
> [ERROR] Failures:
>
> [ERROR]   TestPutKuduT.testWriteKuduWithDefaults:70 Could not invoke
> methods annotated with @OnScheduled annotation due to:
> java.lang.reflect.InvocationTargetException
>
>
> *The test case is here :*
>
> @Test
> public void testWriteKuduWithDefaults() throws InitializationException {
>     final TestRunner runner = TestRunners.newTestRunner(PutKudu.class);
>     setUpTestRunner(runner);
>
>     final MockKuduClientService kuduClient = getKuduClientService(runner);
>     createRecordReader(100, runner);
>
>     final String filename = "testWriteKudu-" + System.currentTimeMillis();
>     final Map<String,String> flowFileAttributes = new HashMap<>();
>     flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
>     runner.enqueue("trigger", flowFileAttributes);
>     runner.run();
>     runner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS);
>     // verify the successful flow file has the expected content &
> attributes
>     final MockFlowFile mockFlowFile =
> runner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
>     mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(),
> filename);
>     mockFlowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "100");
>     mockFlowFile.assertContentEquals("trigger");
> }
>
> *and onSchedule meMethod is here:*
>
> @OnScheduled
>
> public void onScheduled(final ProcessContext context) throws
> IOException, LoginException {
>
>     final String tableName =
> context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
>     operationType =
> OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
>     batchSize =
> context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
>     ffbatch   =
>
> context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
>     flushMode =
>
> SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
>     getLogger().debug("Setting up Kudu connection...");
>     clientService =
>
> context.getProperty(KUDU_CLIENT_SERVICE).asControllerService(KuduClientService.class);
>     kuduTable = clientService.getKuduClient().openTable(tableName);
>     getLogger().debug("Kudu connection successfully initialized");
> }
>
> *Can someone one help me to bypass this issue? I tried all optiones
> dicussed here
>
> http://apache-nifi-developer-list.39713.n7.nabble.com/Error-handling-in-OnScheduled-td19446.html
> <
> http://apache-nifi-developer-list.39713.n7.nabble.com/Error-handling-in-OnScheduled-td19446.html
> >
> *
>
Reply | Threaded
Open this post in threaded view
|

Re: NIFI Unit Test Error with @OnScheduled annotation

Bryan Bende
Lets use one list only.

I think there should be more to this error, I believe the invocation
target exception means another exception was thrown from OnScheduled
while the framework tried to execute OnScheduled.

Not sure why the real exception isn't visible, but you could always
try temporarily wrapping the whole OnScheduled in a try/catch and
logging any exception.

On Wed, Feb 27, 2019 at 8:24 AM Phillip Grenier <[hidden email]> wrote:

>
> Kumar,
>
> I would validate you included the nifi-mock in your pom.xml.
>
> For more information on testing review the developer guide or this post.
>
> Hope that helps,
>
> Phillip
> https://nifi.rocks
>
> On Tue, Feb 26, 2019 at 6:04 PM Sandish Kumar HN <[hidden email]> wrote:
>>
>> Hi,
>>
>>
>> *I'm stuck with below error:*
>>
>>
>> java.lang.AssertionError: Could not invoke methods annotated with
>> @OnScheduled annotation due to: java.lang.reflect.InvocationTargetException
>>
>> at
>> org.apache.nifi.kudu.TestPutKuduT.testWriteKuduWithDefaults(TestPutKuduT.java:70)
>>
>>
>> [INFO]
>>
>> [INFO] Results:
>>
>> [INFO]
>>
>> [ERROR] Failures:
>>
>> [ERROR]   TestPutKuduT.testWriteKuduWithDefaults:70 Could not invoke
>> methods annotated with @OnScheduled annotation due to:
>> java.lang.reflect.InvocationTargetException
>>
>>
>> *The test case is here :*
>>
>> @Test
>> public void testWriteKuduWithDefaults() throws InitializationException {
>>     final TestRunner runner = TestRunners.newTestRunner(PutKudu.class);
>>     setUpTestRunner(runner);
>>
>>     final MockKuduClientService kuduClient = getKuduClientService(runner);
>>     createRecordReader(100, runner);
>>
>>     final String filename = "testWriteKudu-" + System.currentTimeMillis();
>>     final Map<String,String> flowFileAttributes = new HashMap<>();
>>     flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
>>     runner.enqueue("trigger", flowFileAttributes);
>>     runner.run();
>>     runner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS);
>>     // verify the successful flow file has the expected content & attributes
>>     final MockFlowFile mockFlowFile =
>> runner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
>>     mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename);
>>     mockFlowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "100");
>>     mockFlowFile.assertContentEquals("trigger");
>> }
>>
>> *and onSchedule meMethod is here:*
>>
>> @OnScheduled
>>
>> public void onScheduled(final ProcessContext context) throws
>> IOException, LoginException {
>>
>>     final String tableName =
>> context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
>>     operationType =
>> OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
>>     batchSize =
>> context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
>>     ffbatch   =
>> context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
>>     flushMode =
>> SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
>>     getLogger().debug("Setting up Kudu connection...");
>>     clientService =
>> context.getProperty(KUDU_CLIENT_SERVICE).asControllerService(KuduClientService.class);
>>     kuduTable = clientService.getKuduClient().openTable(tableName);
>>     getLogger().debug("Kudu connection successfully initialized");
>> }
>>
>> *Can someone one help me to bypass this issue? I tried all optiones
>> dicussed here
>> http://apache-nifi-developer-list.39713.n7.nabble.com/Error-handling-in-OnScheduled-td19446.html
>> <http://apache-nifi-developer-list.39713.n7.nabble.com/Error-handling-in-OnScheduled-td19446.html>
>> *
Reply | Threaded
Open this post in threaded view
|

Re: NIFI Unit Test Error with @OnScheduled annotation

Matt Burgess-2
+1 to Bryan's comments. If I had to guess I'd say the error is
probably in the line:

kuduTable = clientService.getKuduClient().openTable(tableName);

That's the only one that does heavy lifting vs standard NiFi API
calls, and should be in a try/catch for that reason. Also IIRC setting
up external connections in @OnScheduled is a bit of an anti-pattern as
any uncaught exceptions cause the processor to not finish its
scheduling initialization properly. This used to be a problem with
leaked threads or memory or something (but has since been fixed). I
think the conventional wisdom is to have an AtomicReference to your
connection object (kuduTable here) and synchronize the onTrigger
thread(s) so the "first one in" gets to create the connection object
and the rest use it. If that's not accurate, hopefully someone who
knows more about it will correct me and clarify :)

Regards,
Matt

On Wed, Feb 27, 2019 at 9:32 AM Bryan Bende <[hidden email]> wrote:

>
> Lets use one list only.
>
> I think there should be more to this error, I believe the invocation
> target exception means another exception was thrown from OnScheduled
> while the framework tried to execute OnScheduled.
>
> Not sure why the real exception isn't visible, but you could always
> try temporarily wrapping the whole OnScheduled in a try/catch and
> logging any exception.
>
> On Wed, Feb 27, 2019 at 8:24 AM Phillip Grenier <[hidden email]> wrote:
> >
> > Kumar,
> >
> > I would validate you included the nifi-mock in your pom.xml.
> >
> > For more information on testing review the developer guide or this post.
> >
> > Hope that helps,
> >
> > Phillip
> > https://nifi.rocks
> >
> > On Tue, Feb 26, 2019 at 6:04 PM Sandish Kumar HN <[hidden email]> wrote:
> >>
> >> Hi,
> >>
> >>
> >> *I'm stuck with below error:*
> >>
> >>
> >> java.lang.AssertionError: Could not invoke methods annotated with
> >> @OnScheduled annotation due to: java.lang.reflect.InvocationTargetException
> >>
> >> at
> >> org.apache.nifi.kudu.TestPutKuduT.testWriteKuduWithDefaults(TestPutKuduT.java:70)
> >>
> >>
> >> [INFO]
> >>
> >> [INFO] Results:
> >>
> >> [INFO]
> >>
> >> [ERROR] Failures:
> >>
> >> [ERROR]   TestPutKuduT.testWriteKuduWithDefaults:70 Could not invoke
> >> methods annotated with @OnScheduled annotation due to:
> >> java.lang.reflect.InvocationTargetException
> >>
> >>
> >> *The test case is here :*
> >>
> >> @Test
> >> public void testWriteKuduWithDefaults() throws InitializationException {
> >>     final TestRunner runner = TestRunners.newTestRunner(PutKudu.class);
> >>     setUpTestRunner(runner);
> >>
> >>     final MockKuduClientService kuduClient = getKuduClientService(runner);
> >>     createRecordReader(100, runner);
> >>
> >>     final String filename = "testWriteKudu-" + System.currentTimeMillis();
> >>     final Map<String,String> flowFileAttributes = new HashMap<>();
> >>     flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
> >>     runner.enqueue("trigger", flowFileAttributes);
> >>     runner.run();
> >>     runner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS);
> >>     // verify the successful flow file has the expected content & attributes
> >>     final MockFlowFile mockFlowFile =
> >> runner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
> >>     mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename);
> >>     mockFlowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "100");
> >>     mockFlowFile.assertContentEquals("trigger");
> >> }
> >>
> >> *and onSchedule meMethod is here:*
> >>
> >> @OnScheduled
> >>
> >> public void onScheduled(final ProcessContext context) throws
> >> IOException, LoginException {
> >>
> >>     final String tableName =
> >> context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
> >>     operationType =
> >> OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
> >>     batchSize =
> >> context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
> >>     ffbatch   =
> >> context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
> >>     flushMode =
> >> SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
> >>     getLogger().debug("Setting up Kudu connection...");
> >>     clientService =
> >> context.getProperty(KUDU_CLIENT_SERVICE).asControllerService(KuduClientService.class);
> >>     kuduTable = clientService.getKuduClient().openTable(tableName);
> >>     getLogger().debug("Kudu connection successfully initialized");
> >> }
> >>
> >> *Can someone one help me to bypass this issue? I tried all optiones
> >> dicussed here
> >> http://apache-nifi-developer-list.39713.n7.nabble.com/Error-handling-in-OnScheduled-td19446.html
> >> <http://apache-nifi-developer-list.39713.n7.nabble.com/Error-handling-in-OnScheduled-td19446.html>
> >> *
Reply | Threaded
Open this post in threaded view
|

Re: NIFI Unit Test Error with @OnScheduled annotation

Bryan Bende
Good point Matt. Typically its the pattern like this at the beginning
of onTrigger...

if (connection == null) {
  synchronize(something) {
     If (connection == null) {
        // initialize
    }
  }
}

This way you don't synchronize every call to onTrigger which would be
expensive, but also protect against starting the processor with more
than 1 concurrent task where they both might trying to init the
connection.

On Wed, Feb 27, 2019 at 9:46 AM Matt Burgess <[hidden email]> wrote:

>
> +1 to Bryan's comments. If I had to guess I'd say the error is
> probably in the line:
>
> kuduTable = clientService.getKuduClient().openTable(tableName);
>
> That's the only one that does heavy lifting vs standard NiFi API
> calls, and should be in a try/catch for that reason. Also IIRC setting
> up external connections in @OnScheduled is a bit of an anti-pattern as
> any uncaught exceptions cause the processor to not finish its
> scheduling initialization properly. This used to be a problem with
> leaked threads or memory or something (but has since been fixed). I
> think the conventional wisdom is to have an AtomicReference to your
> connection object (kuduTable here) and synchronize the onTrigger
> thread(s) so the "first one in" gets to create the connection object
> and the rest use it. If that's not accurate, hopefully someone who
> knows more about it will correct me and clarify :)
>
> Regards,
> Matt
>
> On Wed, Feb 27, 2019 at 9:32 AM Bryan Bende <[hidden email]> wrote:
> >
> > Lets use one list only.
> >
> > I think there should be more to this error, I believe the invocation
> > target exception means another exception was thrown from OnScheduled
> > while the framework tried to execute OnScheduled.
> >
> > Not sure why the real exception isn't visible, but you could always
> > try temporarily wrapping the whole OnScheduled in a try/catch and
> > logging any exception.
> >
> > On Wed, Feb 27, 2019 at 8:24 AM Phillip Grenier <[hidden email]> wrote:
> > >
> > > Kumar,
> > >
> > > I would validate you included the nifi-mock in your pom.xml.
> > >
> > > For more information on testing review the developer guide or this post.
> > >
> > > Hope that helps,
> > >
> > > Phillip
> > > https://nifi.rocks
> > >
> > > On Tue, Feb 26, 2019 at 6:04 PM Sandish Kumar HN <[hidden email]> wrote:
> > >>
> > >> Hi,
> > >>
> > >>
> > >> *I'm stuck with below error:*
> > >>
> > >>
> > >> java.lang.AssertionError: Could not invoke methods annotated with
> > >> @OnScheduled annotation due to: java.lang.reflect.InvocationTargetException
> > >>
> > >> at
> > >> org.apache.nifi.kudu.TestPutKuduT.testWriteKuduWithDefaults(TestPutKuduT.java:70)
> > >>
> > >>
> > >> [INFO]
> > >>
> > >> [INFO] Results:
> > >>
> > >> [INFO]
> > >>
> > >> [ERROR] Failures:
> > >>
> > >> [ERROR]   TestPutKuduT.testWriteKuduWithDefaults:70 Could not invoke
> > >> methods annotated with @OnScheduled annotation due to:
> > >> java.lang.reflect.InvocationTargetException
> > >>
> > >>
> > >> *The test case is here :*
> > >>
> > >> @Test
> > >> public void testWriteKuduWithDefaults() throws InitializationException {
> > >>     final TestRunner runner = TestRunners.newTestRunner(PutKudu.class);
> > >>     setUpTestRunner(runner);
> > >>
> > >>     final MockKuduClientService kuduClient = getKuduClientService(runner);
> > >>     createRecordReader(100, runner);
> > >>
> > >>     final String filename = "testWriteKudu-" + System.currentTimeMillis();
> > >>     final Map<String,String> flowFileAttributes = new HashMap<>();
> > >>     flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
> > >>     runner.enqueue("trigger", flowFileAttributes);
> > >>     runner.run();
> > >>     runner.assertAllFlowFilesTransferred(PutKudu.REL_SUCCESS);
> > >>     // verify the successful flow file has the expected content & attributes
> > >>     final MockFlowFile mockFlowFile =
> > >> runner.getFlowFilesForRelationship(PutKudu.REL_SUCCESS).get(0);
> > >>     mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename);
> > >>     mockFlowFile.assertAttributeEquals(PutKudu.RECORD_COUNT_ATTR, "100");
> > >>     mockFlowFile.assertContentEquals("trigger");
> > >> }
> > >>
> > >> *and onSchedule meMethod is here:*
> > >>
> > >> @OnScheduled
> > >>
> > >> public void onScheduled(final ProcessContext context) throws
> > >> IOException, LoginException {
> > >>
> > >>     final String tableName =
> > >> context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
> > >>     operationType =
> > >> OperationType.valueOf(context.getProperty(INSERT_OPERATION).getValue());
> > >>     batchSize =
> > >> context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
> > >>     ffbatch   =
> > >> context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
> > >>     flushMode =
> > >> SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
> > >>     getLogger().debug("Setting up Kudu connection...");
> > >>     clientService =
> > >> context.getProperty(KUDU_CLIENT_SERVICE).asControllerService(KuduClientService.class);
> > >>     kuduTable = clientService.getKuduClient().openTable(tableName);
> > >>     getLogger().debug("Kudu connection successfully initialized");
> > >> }
> > >>
> > >> *Can someone one help me to bypass this issue? I tried all optiones
> > >> dicussed here
> > >> http://apache-nifi-developer-list.39713.n7.nabble.com/Error-handling-in-OnScheduled-td19446.html
> > >> <http://apache-nifi-developer-list.39713.n7.nabble.com/Error-handling-in-OnScheduled-td19446.html>
> > >> *