DetectDuplicateRecord Processor

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

DetectDuplicateRecord Processor

Adam Fisher
Hello NiFi developers! I'm new to NiFi and decided to create a
*DetectDuplicateRecord
*processor. Mike Thomsen also created an implementation about the same
time. It was suggested we open this up for discussion with the community to
identify use cases.

Below are the two implementations each with their respective properties.

   - https://issues.apache.org/jira/browse/NIFI-6014
   - *Record Reader*
      - *Record Writer*
      - *Cache Service*
      - *Lookup Record Path:* The record path operation to use for
      generating the lookup key for each record.
      - *Cache Value Strategy:* This determines what will be written to the
      cache from the record. It can be either a literal value or the
result of a
      record path operation.
      - *Cache Value: *This is the value that will be written to the cache
      at the appropriate record and record key if it does not exist.
      - *Don't Send Empty Record Sets: *Same as "Include Zero Record
      FlowFiles" below

      - https://issues.apache.org/jira/browse/NIFI-6047
   - *Record Reader*
      -
*Record Writer *
      - *Include Zero Record FlowFiles*
      - *Cache The Entry Identifier:* Similar to DetectDuplicate
      - *Distributed Cache Service:* Similar to DetectDuplicate
      - *Age Off Duration:* Similar to DetectDuplicate
      - *Record Hashing Algorithm:* The algorithm used to hash the combined
      result of RecordPath values in the cache.
      - *Filter Type: *The filter used to determine whether a record has
      been seen before based on the matching RecordPath criteria defined by
      user-defined properties. Current options are *HashSet* or
      *BloomFilter*.
      - *Filter Capacity Hint:* An estimation of the total number of unique
      records to be processed.
      - *BloomFilter Probability:* The desired false positive probability
      when using the BloomFilter filter type.
      - *<User Defined Properties>:* The name of the property is a record
      path. All record paths are resolved on each record to determine
the unique
      value for a record. The value of the user-defined property is ignored.
      Initial thought however was to make the value expose field variables sort
      of how UpdateRecord does (i.e. ${field.value})


There are many ways duplicate records could be detected. Offering the user
the ability to:

   - *Specify the cache identifier* means users can use the same identifier
   in different DetectDuplicateRecord blocks in different process groups.
   Specifying a unique name based on the file name for example will conversely
   isolate the unique check to just the daily load of a specific file.
   - *Set a cache expiration* lets users do things like set it to last for
   24 hours so we only store unique cache information from one day to the
   next. This is useful when you are doing a daily file load and you only want
   to process the new records or the records that changed.
   - *Select a filter type* will allow you to optimize for memory usage. I
   need to process multi-GB sized files and keeping a hash of each of those is
   going to get expensive with a HashSet in memory. But offering a BloomFilter
   is acceptable especially when you are doing database operations downstream
   and don't care if you have some false positives but it will reduce the
   number of attempted duplicate inserts/updates you perform.


Here's to hoping this finds you all warm and well. I love this software!


Adam
Reply | Threaded
Open this post in threaded view
|

Re: DetectDuplicateRecord Processor

Mike Thomsen
Thanks, Adam. The use case I had, in stereotypical agile fashion could be
summarized like this:

"As a NiFi user, I want to be able to generate UUIDv5 IDs for all of my
record sets and then have a downstream processor check each generated UUID
against the existing ingested data to see if there is an existing row with
that UUID."

For us, at least, false positives are something that we would need to be
fairly aggressive in preventing.

One possibility here is that we split the difference with your contribution
being an in-memory deduplicator and mine going purely against a distributed
map cache client. I think there might be enough ground to cover that we
might want to have two approaches to this problem that specialize rather
than a one-size-fits-most single solution.

Thanks,

Mike

On Sat, Feb 16, 2019 at 9:18 PM Adam Fisher <[hidden email]> wrote:

> Hello NiFi developers! I'm new to NiFi and decided to create a
> *DetectDuplicateRecord
> *processor. Mike Thomsen also created an implementation about the same
> time. It was suggested we open this up for discussion with the community to
> identify use cases.
>
> Below are the two implementations each with their respective properties.
>
>    - https://issues.apache.org/jira/browse/NIFI-6014
>    - *Record Reader*
>       - *Record Writer*
>       - *Cache Service*
>       - *Lookup Record Path:* The record path operation to use for
>       generating the lookup key for each record.
>       - *Cache Value Strategy:* This determines what will be written to the
>       cache from the record. It can be either a literal value or the
> result of a
>       record path operation.
>       - *Cache Value: *This is the value that will be written to the cache
>       at the appropriate record and record key if it does not exist.
>       - *Don't Send Empty Record Sets: *Same as "Include Zero Record
>       FlowFiles" below
>
>       - https://issues.apache.org/jira/browse/NIFI-6047
>    - *Record Reader*
>       -
> *Record Writer *
>       - *Include Zero Record FlowFiles*
>       - *Cache The Entry Identifier:* Similar to DetectDuplicate
>       - *Distributed Cache Service:* Similar to DetectDuplicate
>       - *Age Off Duration:* Similar to DetectDuplicate
>       - *Record Hashing Algorithm:* The algorithm used to hash the combined
>       result of RecordPath values in the cache.
>       - *Filter Type: *The filter used to determine whether a record has
>       been seen before based on the matching RecordPath criteria defined by
>       user-defined properties. Current options are *HashSet* or
>       *BloomFilter*.
>       - *Filter Capacity Hint:* An estimation of the total number of unique
>       records to be processed.
>       - *BloomFilter Probability:* The desired false positive probability
>       when using the BloomFilter filter type.
>       - *<User Defined Properties>:* The name of the property is a record
>       path. All record paths are resolved on each record to determine
> the unique
>       value for a record. The value of the user-defined property is
> ignored.
>       Initial thought however was to make the value expose field variables
> sort
>       of how UpdateRecord does (i.e. ${field.value})
>
>
> There are many ways duplicate records could be detected. Offering the user
> the ability to:
>
>    - *Specify the cache identifier* means users can use the same identifier
>    in different DetectDuplicateRecord blocks in different process groups.
>    Specifying a unique name based on the file name for example will
> conversely
>    isolate the unique check to just the daily load of a specific file.
>    - *Set a cache expiration* lets users do things like set it to last for
>    24 hours so we only store unique cache information from one day to the
>    next. This is useful when you are doing a daily file load and you only
> want
>    to process the new records or the records that changed.
>    - *Select a filter type* will allow you to optimize for memory usage. I
>    need to process multi-GB sized files and keeping a hash of each of
> those is
>    going to get expensive with a HashSet in memory. But offering a
> BloomFilter
>    is acceptable especially when you are doing database operations
> downstream
>    and don't care if you have some false positives but it will reduce the
>    number of attempted duplicate inserts/updates you perform.
>
>
> Here's to hoping this finds you all warm and well. I love this software!
>
>
> Adam
>
Reply | Threaded
Open this post in threaded view
|

Re: DetectDuplicateRecord Processor

Joe Witt
Mike, Adam,

It appears the distinction of interest here between the two general
approaches is less about in-mem vs map cache and instead is more about
approximate/fast detection vs certain/depending on size of cache approaches.

I'm not sure if this is quite right or if the distinction warrants two
processors but this is my first impression.

But it is probably best if the two of you, as contributors to this problem,
discuss and find consensus.

Thanks

On Sat, Feb 16, 2019 at 9:33 PM Mike Thomsen <[hidden email]> wrote:

> Thanks, Adam. The use case I had, in stereotypical agile fashion could be
> summarized like this:
>
> "As a NiFi user, I want to be able to generate UUIDv5 IDs for all of my
> record sets and then have a downstream processor check each generated UUID
> against the existing ingested data to see if there is an existing row with
> that UUID."
>
> For us, at least, false positives are something that we would need to be
> fairly aggressive in preventing.
>
> One possibility here is that we split the difference with your contribution
> being an in-memory deduplicator and mine going purely against a distributed
> map cache client. I think there might be enough ground to cover that we
> might want to have two approaches to this problem that specialize rather
> than a one-size-fits-most single solution.
>
> Thanks,
>
> Mike
>
> On Sat, Feb 16, 2019 at 9:18 PM Adam Fisher <[hidden email]> wrote:
>
> > Hello NiFi developers! I'm new to NiFi and decided to create a
> > *DetectDuplicateRecord
> > *processor. Mike Thomsen also created an implementation about the same
> > time. It was suggested we open this up for discussion with the community
> to
> > identify use cases.
> >
> > Below are the two implementations each with their respective properties.
> >
> >    - https://issues.apache.org/jira/browse/NIFI-6014
> >    - *Record Reader*
> >       - *Record Writer*
> >       - *Cache Service*
> >       - *Lookup Record Path:* The record path operation to use for
> >       generating the lookup key for each record.
> >       - *Cache Value Strategy:* This determines what will be written to
> the
> >       cache from the record. It can be either a literal value or the
> > result of a
> >       record path operation.
> >       - *Cache Value: *This is the value that will be written to the
> cache
> >       at the appropriate record and record key if it does not exist.
> >       - *Don't Send Empty Record Sets: *Same as "Include Zero Record
> >       FlowFiles" below
> >
> >       - https://issues.apache.org/jira/browse/NIFI-6047
> >    - *Record Reader*
> >       -
> > *Record Writer *
> >       - *Include Zero Record FlowFiles*
> >       - *Cache The Entry Identifier:* Similar to DetectDuplicate
> >       - *Distributed Cache Service:* Similar to DetectDuplicate
> >       - *Age Off Duration:* Similar to DetectDuplicate
> >       - *Record Hashing Algorithm:* The algorithm used to hash the
> combined
> >       result of RecordPath values in the cache.
> >       - *Filter Type: *The filter used to determine whether a record has
> >       been seen before based on the matching RecordPath criteria defined
> by
> >       user-defined properties. Current options are *HashSet* or
> >       *BloomFilter*.
> >       - *Filter Capacity Hint:* An estimation of the total number of
> unique
> >       records to be processed.
> >       - *BloomFilter Probability:* The desired false positive probability
> >       when using the BloomFilter filter type.
> >       - *<User Defined Properties>:* The name of the property is a record
> >       path. All record paths are resolved on each record to determine
> > the unique
> >       value for a record. The value of the user-defined property is
> > ignored.
> >       Initial thought however was to make the value expose field
> variables
> > sort
> >       of how UpdateRecord does (i.e. ${field.value})
> >
> >
> > There are many ways duplicate records could be detected. Offering the
> user
> > the ability to:
> >
> >    - *Specify the cache identifier* means users can use the same
> identifier
> >    in different DetectDuplicateRecord blocks in different process groups.
> >    Specifying a unique name based on the file name for example will
> > conversely
> >    isolate the unique check to just the daily load of a specific file.
> >    - *Set a cache expiration* lets users do things like set it to last
> for
> >    24 hours so we only store unique cache information from one day to the
> >    next. This is useful when you are doing a daily file load and you only
> > want
> >    to process the new records or the records that changed.
> >    - *Select a filter type* will allow you to optimize for memory usage.
> I
> >    need to process multi-GB sized files and keeping a hash of each of
> > those is
> >    going to get expensive with a HashSet in memory. But offering a
> > BloomFilter
> >    is acceptable especially when you are doing database operations
> > downstream
> >    and don't care if you have some false positives but it will reduce the
> >    number of attempted duplicate inserts/updates you perform.
> >
> >
> > Here's to hoping this finds you all warm and well. I love this software!
> >
> >
> > Adam
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: DetectDuplicateRecord Processor

Mike Thomsen
I'll have to look at Adam's code in more depth, but I think one reason we
might need two is that I didn't see any ability to just check an existing
record path against the cache and call it a day. For teams using a standard
UUID scheme, that's all we'd need or want. Could be wrong abut that and
Adam please let me know if I am.

On Tue, Feb 19, 2019 at 7:28 AM Joe Witt <[hidden email]> wrote:

> Mike, Adam,
>
> It appears the distinction of interest here between the two general
> approaches is less about in-mem vs map cache and instead is more about
> approximate/fast detection vs certain/depending on size of cache
> approaches.
>
> I'm not sure if this is quite right or if the distinction warrants two
> processors but this is my first impression.
>
> But it is probably best if the two of you, as contributors to this problem,
> discuss and find consensus.
>
> Thanks
>
> On Sat, Feb 16, 2019 at 9:33 PM Mike Thomsen <[hidden email]>
> wrote:
>
> > Thanks, Adam. The use case I had, in stereotypical agile fashion could be
> > summarized like this:
> >
> > "As a NiFi user, I want to be able to generate UUIDv5 IDs for all of my
> > record sets and then have a downstream processor check each generated
> UUID
> > against the existing ingested data to see if there is an existing row
> with
> > that UUID."
> >
> > For us, at least, false positives are something that we would need to be
> > fairly aggressive in preventing.
> >
> > One possibility here is that we split the difference with your
> contribution
> > being an in-memory deduplicator and mine going purely against a
> distributed
> > map cache client. I think there might be enough ground to cover that we
> > might want to have two approaches to this problem that specialize rather
> > than a one-size-fits-most single solution.
> >
> > Thanks,
> >
> > Mike
> >
> > On Sat, Feb 16, 2019 at 9:18 PM Adam Fisher <[hidden email]>
> wrote:
> >
> > > Hello NiFi developers! I'm new to NiFi and decided to create a
> > > *DetectDuplicateRecord
> > > *processor. Mike Thomsen also created an implementation about the same
> > > time. It was suggested we open this up for discussion with the
> community
> > to
> > > identify use cases.
> > >
> > > Below are the two implementations each with their respective
> properties.
> > >
> > >    - https://issues.apache.org/jira/browse/NIFI-6014
> > >    - *Record Reader*
> > >       - *Record Writer*
> > >       - *Cache Service*
> > >       - *Lookup Record Path:* The record path operation to use for
> > >       generating the lookup key for each record.
> > >       - *Cache Value Strategy:* This determines what will be written to
> > the
> > >       cache from the record. It can be either a literal value or the
> > > result of a
> > >       record path operation.
> > >       - *Cache Value: *This is the value that will be written to the
> > cache
> > >       at the appropriate record and record key if it does not exist.
> > >       - *Don't Send Empty Record Sets: *Same as "Include Zero Record
> > >       FlowFiles" below
> > >
> > >       - https://issues.apache.org/jira/browse/NIFI-6047
> > >    - *Record Reader*
> > >       -
> > > *Record Writer *
> > >       - *Include Zero Record FlowFiles*
> > >       - *Cache The Entry Identifier:* Similar to DetectDuplicate
> > >       - *Distributed Cache Service:* Similar to DetectDuplicate
> > >       - *Age Off Duration:* Similar to DetectDuplicate
> > >       - *Record Hashing Algorithm:* The algorithm used to hash the
> > combined
> > >       result of RecordPath values in the cache.
> > >       - *Filter Type: *The filter used to determine whether a record
> has
> > >       been seen before based on the matching RecordPath criteria
> defined
> > by
> > >       user-defined properties. Current options are *HashSet* or
> > >       *BloomFilter*.
> > >       - *Filter Capacity Hint:* An estimation of the total number of
> > unique
> > >       records to be processed.
> > >       - *BloomFilter Probability:* The desired false positive
> probability
> > >       when using the BloomFilter filter type.
> > >       - *<User Defined Properties>:* The name of the property is a
> record
> > >       path. All record paths are resolved on each record to determine
> > > the unique
> > >       value for a record. The value of the user-defined property is
> > > ignored.
> > >       Initial thought however was to make the value expose field
> > variables
> > > sort
> > >       of how UpdateRecord does (i.e. ${field.value})
> > >
> > >
> > > There are many ways duplicate records could be detected. Offering the
> > user
> > > the ability to:
> > >
> > >    - *Specify the cache identifier* means users can use the same
> > identifier
> > >    in different DetectDuplicateRecord blocks in different process
> groups.
> > >    Specifying a unique name based on the file name for example will
> > > conversely
> > >    isolate the unique check to just the daily load of a specific file.
> > >    - *Set a cache expiration* lets users do things like set it to last
> > for
> > >    24 hours so we only store unique cache information from one day to
> the
> > >    next. This is useful when you are doing a daily file load and you
> only
> > > want
> > >    to process the new records or the records that changed.
> > >    - *Select a filter type* will allow you to optimize for memory
> usage.
> > I
> > >    need to process multi-GB sized files and keeping a hash of each of
> > > those is
> > >    going to get expensive with a HashSet in memory. But offering a
> > > BloomFilter
> > >    is acceptable especially when you are doing database operations
> > > downstream
> > >    and don't care if you have some false positives but it will reduce
> the
> > >    number of attempted duplicate inserts/updates you perform.
> > >
> > >
> > > Here's to hoping this finds you all warm and well. I love this
> software!
> > >
> > >
> > > Adam
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: DetectDuplicateRecord Processor

Adam Fisher
I think your use case would be achieved by specifying one user defined
property for the record path to the uuid. If you didn't want the cache
between invocations of the processor, you would just set "cache the entry
identifier" to false so it would only check for unique values in the file
instead of across incoming flow files.

I think it does make sense with what was said regarding not using the
in-memory approach. That allows other DetectDuplicateRecord processors
using the same cache entries to not step on each others toes when
persisting the data structure back to disk and seems more scaleable. As for
the bloomfilter type, it would just be loaded into memory still since it
doesn't really make sense to do per record lookups from the cache based on
how it works.

I was planning to combine all resolved values of user defined record path
properties and hashing that value to ensure space is constant per record in
cache entries (hence the ability to choose the hashing algorithm).

The bloomfilter case is useful for really large processing where you don't
care about some duplicates but don't want to flood the cache with an entry
per record.

It sounds like this scenario might be what we're going toward: When using
cache the entry identifier is false, use in memory hash set and in memory
bloomfilter. If true, use cache lookup per entry for hashset or in memory
bloom filter that gets persisted after the processor finishes its work. I
suppose the bloom filter algorithm could also be modified to support cache
lookups per query, treating each bit bucket as a cache entry behind the
scenes. A fair bit more work involved if we want to go that route.


Adam

On Tue, Feb 19, 2019, 8:07 AM Mike Thomsen <[hidden email] wrote:

> I'll have to look at Adam's code in more depth, but I think one reason we
> might need two is that I didn't see any ability to just check an existing
> record path against the cache and call it a day. For teams using a standard
> UUID scheme, that's all we'd need or want. Could be wrong abut that and
> Adam please let me know if I am.
>
> On Tue, Feb 19, 2019 at 7:28 AM Joe Witt <[hidden email]> wrote:
>
> > Mike, Adam,
> >
> > It appears the distinction of interest here between the two general
> > approaches is less about in-mem vs map cache and instead is more about
> > approximate/fast detection vs certain/depending on size of cache
> > approaches.
> >
> > I'm not sure if this is quite right or if the distinction warrants two
> > processors but this is my first impression.
> >
> > But it is probably best if the two of you, as contributors to this
> problem,
> > discuss and find consensus.
> >
> > Thanks
> >
> > On Sat, Feb 16, 2019 at 9:33 PM Mike Thomsen <[hidden email]>
> > wrote:
> >
> > > Thanks, Adam. The use case I had, in stereotypical agile fashion could
> be
> > > summarized like this:
> > >
> > > "As a NiFi user, I want to be able to generate UUIDv5 IDs for all of my
> > > record sets and then have a downstream processor check each generated
> > UUID
> > > against the existing ingested data to see if there is an existing row
> > with
> > > that UUID."
> > >
> > > For us, at least, false positives are something that we would need to
> be
> > > fairly aggressive in preventing.
> > >
> > > One possibility here is that we split the difference with your
> > contribution
> > > being an in-memory deduplicator and mine going purely against a
> > distributed
> > > map cache client. I think there might be enough ground to cover that we
> > > might want to have two approaches to this problem that specialize
> rather
> > > than a one-size-fits-most single solution.
> > >
> > > Thanks,
> > >
> > > Mike
> > >
> > > On Sat, Feb 16, 2019 at 9:18 PM Adam Fisher <[hidden email]>
> > wrote:
> > >
> > > > Hello NiFi developers! I'm new to NiFi and decided to create a
> > > > *DetectDuplicateRecord
> > > > *processor. Mike Thomsen also created an implementation about the
> same
> > > > time. It was suggested we open this up for discussion with the
> > community
> > > to
> > > > identify use cases.
> > > >
> > > > Below are the two implementations each with their respective
> > properties.
> > > >
> > > >    - https://issues.apache.org/jira/browse/NIFI-6014
> > > >    - *Record Reader*
> > > >       - *Record Writer*
> > > >       - *Cache Service*
> > > >       - *Lookup Record Path:* The record path operation to use for
> > > >       generating the lookup key for each record.
> > > >       - *Cache Value Strategy:* This determines what will be written
> to
> > > the
> > > >       cache from the record. It can be either a literal value or the
> > > > result of a
> > > >       record path operation.
> > > >       - *Cache Value: *This is the value that will be written to the
> > > cache
> > > >       at the appropriate record and record key if it does not exist.
> > > >       - *Don't Send Empty Record Sets: *Same as "Include Zero Record
> > > >       FlowFiles" below
> > > >
> > > >       - https://issues.apache.org/jira/browse/NIFI-6047
> > > >    - *Record Reader*
> > > >       -
> > > > *Record Writer *
> > > >       - *Include Zero Record FlowFiles*
> > > >       - *Cache The Entry Identifier:* Similar to DetectDuplicate
> > > >       - *Distributed Cache Service:* Similar to DetectDuplicate
> > > >       - *Age Off Duration:* Similar to DetectDuplicate
> > > >       - *Record Hashing Algorithm:* The algorithm used to hash the
> > > combined
> > > >       result of RecordPath values in the cache.
> > > >       - *Filter Type: *The filter used to determine whether a record
> > has
> > > >       been seen before based on the matching RecordPath criteria
> > defined
> > > by
> > > >       user-defined properties. Current options are *HashSet* or
> > > >       *BloomFilter*.
> > > >       - *Filter Capacity Hint:* An estimation of the total number of
> > > unique
> > > >       records to be processed.
> > > >       - *BloomFilter Probability:* The desired false positive
> > probability
> > > >       when using the BloomFilter filter type.
> > > >       - *<User Defined Properties>:* The name of the property is a
> > record
> > > >       path. All record paths are resolved on each record to determine
> > > > the unique
> > > >       value for a record. The value of the user-defined property is
> > > > ignored.
> > > >       Initial thought however was to make the value expose field
> > > variables
> > > > sort
> > > >       of how UpdateRecord does (i.e. ${field.value})
> > > >
> > > >
> > > > There are many ways duplicate records could be detected. Offering the
> > > user
> > > > the ability to:
> > > >
> > > >    - *Specify the cache identifier* means users can use the same
> > > identifier
> > > >    in different DetectDuplicateRecord blocks in different process
> > groups.
> > > >    Specifying a unique name based on the file name for example will
> > > > conversely
> > > >    isolate the unique check to just the daily load of a specific
> file.
> > > >    - *Set a cache expiration* lets users do things like set it to
> last
> > > for
> > > >    24 hours so we only store unique cache information from one day to
> > the
> > > >    next. This is useful when you are doing a daily file load and you
> > only
> > > > want
> > > >    to process the new records or the records that changed.
> > > >    - *Select a filter type* will allow you to optimize for memory
> > usage.
> > > I
> > > >    need to process multi-GB sized files and keeping a hash of each of
> > > > those is
> > > >    going to get expensive with a HashSet in memory. But offering a
> > > > BloomFilter
> > > >    is acceptable especially when you are doing database operations
> > > > downstream
> > > >    and don't care if you have some false positives but it will reduce
> > the
> > > >    number of attempted duplicate inserts/updates you perform.
> > > >
> > > >
> > > > Here's to hoping this finds you all warm and well. I love this
> > software!
> > > >
> > > >
> > > > Adam
> > > >
> > >
> >
>