diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 37921919cf..9ec816029c 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -3,7 +3,7 @@ on: [push, pull_request, workflow_dispatch]
env:
DOCKER_CMAKE_FLAGS: -DDOCKER_VERIFY_THREAD=3 -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DCI_BUILD=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON \
-DENABLE_SPLUNK=ON -DENABLE_GCP=ON -DENABLE_OPC=ON -DENABLE_PYTHON_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_TEST_PROCESSORS=ON -DENABLE_PROMETHEUS=ON \
- -DENABLE_ELASTICSEARCH=ON -DENABLE_GRAFANA_LOKI=ON -DDOCKER_BUILD_ONLY=ON
+ -DENABLE_ELASTICSEARCH=ON -DENABLE_GRAFANA_LOKI=ON -DENABLE_COUCHBASE=ON -DDOCKER_BUILD_ONLY=ON
SCCACHE_GHA_ENABLE: true
CCACHE_DIR: ${{ GITHUB.WORKSPACE }}/.ccache
jobs:
@@ -26,6 +26,7 @@ jobs:
-DENABLE_BZIP2=ON
-DENABLE_CIVET=ON
-DENABLE_CONTROLLER=ON
+ -DENABLE_COUCHBASE=ON
-DENABLE_ELASTICSEARCH=ON
-DENABLE_ENCRYPT_CONFIG=ON
-DENABLE_EXPRESSION_LANGUAGE=ON
@@ -131,6 +132,7 @@ jobs:
-DENABLE_BZIP2=ON
-DENABLE_CIVET=ON
-DENABLE_CONTROLLER=ON
+ -DENABLE_COUCHBASE=ON
-DENABLE_COVERAGE=
-DENABLE_ELASTICSEARCH=ON
-DENABLE_ENCRYPT_CONFIG=ON
@@ -245,6 +247,7 @@ jobs:
-DENABLE_BZIP2=ON
-DENABLE_CIVET=ON
-DENABLE_CONTROLLER=ON
+ -DENABLE_COUCHBASE=OFF
-DENABLE_ELASTICSEARCH=OFF
-DENABLE_ENCRYPT_CONFIG=ON
-DENABLE_EXPRESSION_LANGUAGE=ON
@@ -342,6 +345,7 @@ jobs:
-DENABLE_BZIP2=ON
-DENABLE_CIVET=ON
-DENABLE_CONTROLLER=ON
+ -DENABLE_COUCHBASE=ON
-DENABLE_ELASTICSEARCH=ON
-DENABLE_ENCRYPT_CONFIG=ON
-DENABLE_EXPRESSION_LANGUAGE=ON
diff --git a/.github/workflows/verify-package.yml b/.github/workflows/verify-package.yml
index b82f615121..73a69eecb9 100644
--- a/.github/workflows/verify-package.yml
+++ b/.github/workflows/verify-package.yml
@@ -3,7 +3,7 @@ on: [workflow_dispatch]
env:
DOCKER_CMAKE_FLAGS: -DDOCKER_VERIFY_THREAD=3 -DUSE_SHARED_LIBS= -DSTRICT_GSL_CHECKS=AUDIT -DCI_BUILD=ON -DENABLE_AWS=ON -DENABLE_LIBRDKAFKA=ON -DENABLE_MQTT=ON -DENABLE_AZURE=ON -DENABLE_SQL=ON \
-DENABLE_SPLUNK=ON -DENABLE_GCP=ON -DENABLE_OPC=ON -DENABLE_PYTHON_SCRIPTING=ON -DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_TEST_PROCESSORS=ON -DENABLE_PROMETHEUS=ON \
- -DENABLE_ELASTICSEARCH=OFF -DENABLE_GRAFANA_LOKI=ON -DDOCKER_BUILD_ONLY=ON
+ -DENABLE_ELASTICSEARCH=OFF -DENABLE_GRAFANA_LOKI=ON -DENABLE_COUCHBASE=ON -DDOCKER_BUILD_ONLY=ON
CCACHE_DIR: ${{ GITHUB.WORKSPACE }}/.ccache
jobs:
rocky-build:
diff --git a/CONTROLLERS.md b/CONTROLLERS.md
index e45ccbfb5c..e3263854c1 100644
--- a/CONTROLLERS.md
+++ b/CONTROLLERS.md
@@ -17,6 +17,7 @@ limitations under the License.
- [AWSCredentialsService](#AWSCredentialsService)
- [AzureStorageCredentialsService](#AzureStorageCredentialsService)
+- [CouchbaseClusterService](#CouchbaseClusterService)
- [ElasticsearchCredentialsControllerService](#ElasticsearchCredentialsControllerService)
- [GCPCredentialsControllerService](#GCPCredentialsControllerService)
- [JsonRecordSetReader](#JsonRecordSetReader)
@@ -71,6 +72,23 @@ In the list below, the names of required properties appear in bold. Any other pr
| **Use Managed Identity Credentials** | false | true
false | If true Managed Identity credentials will be used together with the Storage Account Name for authentication. |
+## CouchbaseClusterService
+
+### Description
+
+Provides a centralized Couchbase connection and bucket passwords management. Bucket passwords can be specified via dynamic properties.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+|-----------------------|---------------|------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Connection String** | | | The hostnames or ip addresses of the bootstraping nodes and optional parameters. Syntax: couchbase://node1,node2,nodeN?param1=value1¶m2=value2¶mN=valueN |
+| User Name | | | The user name to authenticate MiNiFi as a Couchbase client. |
+| User Password | | | The user password to authenticate MiNiFi as a Couchbase client.
**Sensitive Property: true** |
+
+
## ElasticsearchCredentialsControllerService
### Description
diff --git a/LICENSE b/LICENSE
index 087a3fb552..5033bad4a3 100644
--- a/LICENSE
+++ b/LICENSE
@@ -214,6 +214,7 @@ This product bundles 'Kubernetes Client Library for C' (kubernetes-client/c), wh
This project bundles a configuration file from 'Kubernetes Metrics Server' (kubernetes-sigs/metrics-server), which is available under an ALv2 license
This project bundles 'OpenSSL' which is available under an ALv2 license
This project bundles 'gRPC' which is available under an ALv2 license
+This project bundles 'couchbase-cxx-client' which is available under an ALv2 license
The Apache NiFi - MiNiFi C++ project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
@@ -1002,7 +1003,7 @@ These libraries are in the public domain (or the equivalent where that is not po
regexp.c and regexp.h from https://github.com/ccxvii/minilibs sha 875c33568b5a4aa4fb3dd0c52ea98f7f0e5ca684
-This product bundles snappy within librdkafka, under the license below.
+This product bundles snappy within librdkafka and couchbase-cxx-client, under the license below.
LICENSE.snappy
--------------------------------------------------------------
@@ -3307,3 +3308,154 @@ Permission is hereby granted, free of charge, to any person obtaining a copy of
The above copyright notice and this permission notice (including the next paragraph) shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+This product bundles HdrHistogram within couchbase-cxx-client, under the license below.
+
+Creative Commons Legal Code
+
+CC0 1.0 Universal
+
+ CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE
+ LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN
+ ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS
+ INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES
+ REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS
+ PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM
+ THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED
+ HEREUNDER.
+
+Statement of Purpose
+
+The laws of most jurisdictions throughout the world automatically confer
+exclusive Copyright and Related Rights (defined below) upon the creator
+and subsequent owner(s) (each and all, an "owner") of an original work of
+authorship and/or a database (each, a "Work").
+
+Certain owners wish to permanently relinquish those rights to a Work for
+the purpose of contributing to a commons of creative, cultural and
+scientific works ("Commons") that the public can reliably and without fear
+of later claims of infringement build upon, modify, incorporate in other
+works, reuse and redistribute as freely as possible in any form whatsoever
+and for any purposes, including without limitation commercial purposes.
+These owners may contribute to the Commons to promote the ideal of a free
+culture and the further production of creative, cultural and scientific
+works, or to gain reputation or greater distribution for their Work in
+part through the use and efforts of others.
+
+For these and/or other purposes and motivations, and without any
+expectation of additional consideration or compensation, the person
+associating CC0 with a Work (the "Affirmer"), to the extent that he or she
+is an owner of Copyright and Related Rights in the Work, voluntarily
+elects to apply CC0 to the Work and publicly distribute the Work under its
+terms, with knowledge of his or her Copyright and Related Rights in the
+Work and the meaning and intended legal effect of CC0 on those rights.
+
+1. Copyright and Related Rights. A Work made available under CC0 may be
+protected by copyright and related or neighboring rights ("Copyright and
+Related Rights"). Copyright and Related Rights include, but are not
+limited to, the following:
+
+ i. the right to reproduce, adapt, distribute, perform, display,
+ communicate, and translate a Work;
+ ii. moral rights retained by the original author(s) and/or performer(s);
+iii. publicity and privacy rights pertaining to a person's image or
+ likeness depicted in a Work;
+ iv. rights protecting against unfair competition in regards to a Work,
+ subject to the limitations in paragraph 4(a), below;
+ v. rights protecting the extraction, dissemination, use and reuse of data
+ in a Work;
+ vi. database rights (such as those arising under Directive 96/9/EC of the
+ European Parliament and of the Council of 11 March 1996 on the legal
+ protection of databases, and under any national implementation
+ thereof, including any amended or successor version of such
+ directive); and
+vii. other similar, equivalent or corresponding rights throughout the
+ world based on applicable law or treaty, and any national
+ implementations thereof.
+
+2. Waiver. To the greatest extent permitted by, but not in contravention
+of, applicable law, Affirmer hereby overtly, fully, permanently,
+irrevocably and unconditionally waives, abandons, and surrenders all of
+Affirmer's Copyright and Related Rights and associated claims and causes
+of action, whether now known or unknown (including existing as well as
+future claims and causes of action), in the Work (i) in all territories
+worldwide, (ii) for the maximum duration provided by applicable law or
+treaty (including future time extensions), (iii) in any current or future
+medium and for any number of copies, and (iv) for any purpose whatsoever,
+including without limitation commercial, advertising or promotional
+purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each
+member of the public at large and to the detriment of Affirmer's heirs and
+successors, fully intending that such Waiver shall not be subject to
+revocation, rescission, cancellation, termination, or any other legal or
+equitable action to disrupt the quiet enjoyment of the Work by the public
+as contemplated by Affirmer's express Statement of Purpose.
+
+3. Public License Fallback. Should any part of the Waiver for any reason
+be judged legally invalid or ineffective under applicable law, then the
+Waiver shall be preserved to the maximum extent permitted taking into
+account Affirmer's express Statement of Purpose. In addition, to the
+extent the Waiver is so judged Affirmer hereby grants to each affected
+person a royalty-free, non transferable, non sublicensable, non exclusive,
+irrevocable and unconditional license to exercise Affirmer's Copyright and
+Related Rights in the Work (i) in all territories worldwide, (ii) for the
+maximum duration provided by applicable law or treaty (including future
+time extensions), (iii) in any current or future medium and for any number
+of copies, and (iv) for any purpose whatsoever, including without
+limitation commercial, advertising or promotional purposes (the
+"License"). The License shall be deemed effective as of the date CC0 was
+applied by Affirmer to the Work. Should any part of the License for any
+reason be judged legally invalid or ineffective under applicable law, such
+partial invalidity or ineffectiveness shall not invalidate the remainder
+of the License, and in such case Affirmer hereby affirms that he or she
+will not (i) exercise any of his or her remaining Copyright and Related
+Rights in the Work or (ii) assert any associated claims and causes of
+action with respect to the Work, in either case contrary to Affirmer's
+express Statement of Purpose.
+
+4. Limitations and Disclaimers.
+
+ a. No trademark or patent rights held by Affirmer are waived, abandoned,
+ surrendered, licensed or otherwise affected by this document.
+ b. Affirmer offers the Work as-is and makes no representations or
+ warranties of any kind concerning the Work, express, implied,
+ statutory or otherwise, including without limitation warranties of
+ title, merchantability, fitness for a particular purpose, non
+ infringement, or the absence of latent or other defects, accuracy, or
+ the present or absence of errors, whether or not discoverable, all to
+ the greatest extent permissible under applicable law.
+ c. Affirmer disclaims responsibility for clearing rights of other persons
+ that may apply to the Work or any use thereof, including without
+ limitation any person's Copyright and Related Rights in the Work.
+ Further, Affirmer disclaims responsibility for obtaining any necessary
+ consents, permissions or other rights required for any use of the
+ Work.
+ d. Affirmer understands and acknowledges that Creative Commons is not a
+ party to this document and has no duty or obligation with respect to
+ this CC0 or use of the Work.
+
+
+This product bundles llhttp within couchbase-cxx-client, under the license below.
+
+This software is licensed under the MIT License.
+
+Copyright Fedor Indutny, 2018.
+
+Permission is hereby granted, free of charge, to any person obtaining a
+copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to permit
+persons to whom the Software is furnished to do so, subject to the
+following conditions:
+
+The above copyright notice and this permission notice shall be included
+in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/NOTICE b/NOTICE
index 8a2390bb36..81a9644b29 100644
--- a/NOTICE
+++ b/NOTICE
@@ -73,6 +73,9 @@ This software includes third party software subject to the following copyrights:
- RE2 - Copyright (c) 2009 The RE2 Authors. All rights reserved.
- c-ares - Copyright (c) 1998 Massachusetts Institute of Technology Copyright (c) 2007 - 2023 Daniel Stenberg with many contributors, see AUTHORS file.
- lua - Copyright (c) 1994–2023 Lua.org, PUC-Rio.
+- couchbase-cxx-client - Copyright 2023-Present Couchbase, Inc.
+- snappy - Copyright 2011, Google Inc.
+- llhttp - Copyright Fedor Indutny, 2018.
The licenses for these third party components are included in LICENSE.txt
diff --git a/PROCESSORS.md b/PROCESSORS.md
index e2ecf5eac4..eec26bbac2 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -77,6 +77,7 @@ limitations under the License.
- [PushGrafanaLokiREST](#PushGrafanaLokiREST)
- [PutAzureBlobStorage](#PutAzureBlobStorage)
- [PutAzureDataLakeStorage](#PutAzureDataLakeStorage)
+- [PutCouchbaseKey](#PutCouchbaseKey)
- [PutFile](#PutFile)
- [PutGCSObject](#PutGCSObject)
- [PutOPCProcessor](#PutOPCProcessor)
@@ -2204,6 +2205,47 @@ In the list below, the names of required properties appear in bold. Any other pr
| failure | Files that could not be written to Azure storage for some reason are transferred to this relationship |
+## PutCouchbaseKey
+
+### Description
+
+Put a document to Couchbase Server via Key/Value access.
+
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+|------------------------------------------|---------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| **Couchbase Cluster Controller Service** | | | A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster. |
+| **Bucket Name** | default | | The name of bucket to access.
**Supports Expression Language: true** |
+| Scope Name | | | Scope to use inside the bucket. If not specified, the _default scope is used.
**Supports Expression Language: true** |
+| Collection Name | | | Collection to use inside the bucket scope. If not specified, the _default collection is used.
**Supports Expression Language: true** |
+| **Document Type** | Json | Json
Binary
String | Content type to store data as. |
+| Document Id | | | A static, fixed Couchbase document id, or an expression to construct the Couchbase document id. If not specified, either the FlowFile uuid attribute or if that's not found a generated uuid will be used.
**Supports Expression Language: true** |
+| **Persist To** | NONE | NONE
ACTIVE
ONE
TWO
THREE
FOUR | Durability constraint about disk persistence. |
+| **Replicate To** | NONE | NONE
ONE
TWO
THREE | Durability constraint about replication. |
+
+### Relationships
+
+| Name | Description |
+|---------|------------------------------------------------------------------------------------------------------------|
+| success | All FlowFiles that are written to Couchbase Server are routed to this relationship. |
+| failure | All FlowFiles failed to be written to Couchbase Server and not retry-able are routed to this relationship. |
+| retry | All FlowFiles failed to be written to Couchbase Server but can be retried are routed to this relationship. |
+
+### Output Attributes
+
+| Attribute | Relationship | Description |
+|-------------------------------|--------------|-----------------------------------------------|
+| couchbase.bucket | success | Bucket where the document was stored. |
+| couchbase.doc.id | success | Id of the document. |
+| couchbase.doc.cas | success | CAS of the document. |
+| couchbase.doc.sequence.number | success | Sequence number associated with the document. |
+| couchbase.partition.uuid | success | UUID of partition. |
+| couchbase.partition.id | success | ID of partition (also known as vBucket). |
+
+
## PutFile
### Description
diff --git a/README.md b/README.md
index 8285e4b4d9..069d142ada 100644
--- a/README.md
+++ b/README.md
@@ -75,6 +75,7 @@ The next table outlines CMAKE flags that correspond with MiNiFi extensions. Exte
| AWS | [AWSCredentialsService](CONTROLLERS.md#awscredentialsservice)
[PutS3Object](PROCESSORS.md#puts3object)
[DeleteS3Object](PROCESSORS.md#deletes3object)
[FetchS3Object](PROCESSORS.md#fetchs3object)
[ListS3](PROCESSORS.md#lists3) | -DENABLE_AWS=ON |
| Azure | [AzureStorageCredentialsService](CONTROLLERS.md#azurestoragecredentialsservice)
[PutAzureBlobStorage](PROCESSORS.md#putazureblobstorage)
[DeleteAzureBlobStorage](PROCESSORS.md#deleteazureblobstorage)
[FetchAzureBlobStorage](PROCESSORS.md#fetchazureblobstorage)
[ListAzureBlobStorage](PROCESSORS.md#listazureblobstorage)
[PutAzureDataLakeStorage](PROCESSORS.md#putazuredatalakestorage)
[DeleteAzureDataLakeStorage](PROCESSORS.md#deleteazuredatalakestorage)
[FetchAzureDataLakeStorage](PROCESSORS.md#fetchazuredatalakestorage)
[ListAzureDataLakeStorage](PROCESSORS.md#listazuredatalakestorage) | -DENABLE_AZURE=ON |
| CivetWeb | [ListenHTTP](PROCESSORS.md#listenhttp) | -DENABLE_CIVET=ON |
+| Couchbase | [CouchbaseClusterService](CONTROLLERS.md#couchbaseclusterservice)
[PutCouchbaseKey](PROCESSORS.md#putcouchbasekey) | -DENABLE_COUCHBASE=ON |
| Elasticsearch | [ElasticsearchCredentialsControllerService](CONTROLLERS.md#elasticsearchcredentialscontrollerservice)
[PostElasticsearch](PROCESSORS.md#postelasticsearch) | -DENABLE_ELASTICSEARCH=ON |
| ExecuteProcess (Linux and macOS) | [ExecuteProcess](PROCESSORS.md#executeprocess) | -DENABLE_EXECUTE_PROCESS=ON |
| Google Cloud Platform | [DeleteGCSObject](PROCESSORS.md#deletegcsobject)
[FetchGCSObject](PROCESSORS.md#fetchgcsobject)
[GCPCredentialsControllerService](CONTROLLERS.md#gcpcredentialscontrollerservice)
[ListGCSBucket](PROCESSORS.md#listgcsbucket)
[PutGCSObject](PROCESSORS.md#putgcsobject) | -DENABLE_GCP=ON |
diff --git a/Windows.md b/Windows.md
index a87abd85fd..3afb5d287a 100644
--- a/Windows.md
+++ b/Windows.md
@@ -122,7 +122,7 @@ A basic working CMake configuration can be inferred from the `win_build_vs.bat`.
```
mkdir build
cd build
-cmake -G "Visual Studio 17 2022" -A x64 -DMINIFI_INCLUDE_VC_REDIST_MERGE_MODULES=OFF -DTEST_CUSTOM_WEL_PROVIDER=OFF -DENABLE_SQL=OFF -DMINIFI_USE_REAL_ODBC_TEST_DRIVER=OFF -DCMAKE_BUILD_TYPE_INIT=Release -DCMAKE_BUILD_TYPE=Release -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=OFF -DENABLE_AWS=OFF -DENABLE_PDH= -DENABLE_AZURE=OFF -DENABLE_SFTP=OFF -DENABLE_SPLUNK= -DENABLE_GCP= -DENABLE_OPENCV=OFF -DENABLE_PROMETHEUS=OFF -DENABLE_ELASTICSEARCH= -DUSE_SHARED_LIBS=OFF -DENABLE_CONTROLLER=ON -DENABLE_BUSTACHE=OFF -DENABLE_ENCRYPT_CONFIG=OFF -DENABLE_LUA_SCRIPTING=OFF -DENABLE_MQTT=OFF -DENABLE_OPC=OFF -DENABLE_OPS=OFF -DENABLE_PYTHON_SCRIPTING= -DBUILD_ROCKSDB=ON -DUSE_SYSTEM_UUID=OFF -DENABLE_LIBARCHIVE=ON -DENABLE_WEL=ON -DMINIFI_FAIL_ON_WARNINGS=OFF -DSKIP_TESTS=OFF ..
+cmake -G "Visual Studio 17 2022" -A x64 -DMINIFI_INCLUDE_VC_REDIST_MERGE_MODULES=OFF -DTEST_CUSTOM_WEL_PROVIDER=OFF -DENABLE_SQL=OFF -DMINIFI_USE_REAL_ODBC_TEST_DRIVER=OFF -DCMAKE_BUILD_TYPE_INIT=Release -DCMAKE_BUILD_TYPE=Release -DWIN32=WIN32 -DENABLE_LIBRDKAFKA=OFF -DENABLE_AWS=OFF -DENABLE_PDH= -DENABLE_AZURE=OFF -DENABLE_SFTP=OFF -DENABLE_SPLUNK= -DENABLE_GCP= -DENABLE_OPENCV=OFF -DENABLE_PROMETHEUS=OFF -DENABLE_ELASTICSEARCH= -DUSE_SHARED_LIBS=OFF -DENABLE_CONTROLLER=ON -DENABLE_BUSTACHE=OFF -DENABLE_ENCRYPT_CONFIG=OFF -DENABLE_LUA_SCRIPTING=OFF -DENABLE_MQTT=OFF -DENABLE_OPC=OFF -DENABLE_OPS=OFF -DENABLE_PYTHON_SCRIPTING= -DBUILD_ROCKSDB=ON -DUSE_SYSTEM_UUID=OFF -DENABLE_LIBARCHIVE=ON -DENABLE_WEL=ON -DENABLE_COUCHBASE=ON -DMINIFI_FAIL_ON_WARNINGS=OFF -DSKIP_TESTS=OFF ..
msbuild /m nifi-minifi-cpp.sln /property:Configuration=Release /property:Platform=x64
copy minifi_main\Release\minifi.exe minifi_main\
cpack
diff --git a/bootstrap.sh b/bootstrap.sh
index 45e630ade2..6b331de185 100755
--- a/bootstrap.sh
+++ b/bootstrap.sh
@@ -316,6 +316,8 @@ add_option PROCFS_ENABLED ${TRUE} "ENABLE_PROCFS"
add_option PROMETHEUS_ENABLED ${TRUE} "ENABLE_PROMETHEUS"
+add_option COUCHBASE_ENABLED ${FALSE} "ENABLE_COUCHBASE"
+
USE_SHARED_LIBS=${TRUE}
ASAN_ENABLED=${FALSE}
MINIFI_FAIL_ON_WARNINGS=${FALSE}
diff --git a/bstrp_functions.sh b/bstrp_functions.sh
index 471938d05f..7311eb1667 100755
--- a/bstrp_functions.sh
+++ b/bstrp_functions.sh
@@ -401,6 +401,7 @@ show_supported_features() {
echo "AE. Prometheus Support .........$(print_feature_status PROMETHEUS_ENABLED)"
echo "AF. Elasticsearch Support ......$(print_feature_status ELASTIC_ENABLED)"
echo "AG. Grafana Loki Support .......$(print_feature_status GRAFANA_LOKI_ENABLED)"
+ echo "AH. Couchbase Support ..........$(print_feature_status COUCHBASE_ENABLED)"
echo "****************************************"
echo " Build Options."
echo "****************************************"
@@ -448,6 +449,7 @@ read_feature_options(){
ae) ToggleFeature PROMETHEUS_ENABLED ;;
af) ToggleFeature ELASTIC_ENABLED ;;
ag) ToggleFeature GRAFANA_LOKI_ENABLED ;;
+ ah) ToggleFeature COUCHBASE_ENABLED ;;
1) ToggleFeature TESTS_ENABLED ;;
2) EnableAllFeatures ;;
4) ToggleFeature USE_SHARED_LIBS;;
diff --git a/cmake/Asio.cmake b/cmake/Asio.cmake
index 28f4bb928a..6f94e6afeb 100644
--- a/cmake/Asio.cmake
+++ b/cmake/Asio.cmake
@@ -27,5 +27,5 @@ if(NOT asio_POPULATED)
add_library(asio INTERFACE)
target_include_directories(asio SYSTEM INTERFACE ${asio_SOURCE_DIR}/asio/include)
find_package(Threads)
- target_link_libraries(asio INTERFACE Threads::Threads)
+ target_link_libraries(asio INTERFACE Threads::Threads OpenSSL::SSL OpenSSL::Crypto)
endif()
diff --git a/cmake/Bustache.cmake b/cmake/Bustache.cmake
index 6c037af6c9..e3d104cda0 100644
--- a/cmake/Bustache.cmake
+++ b/cmake/Bustache.cmake
@@ -21,8 +21,15 @@ include(GetFmt)
get_fmt()
set(BUSTACHE_USE_FMT ON CACHE STRING "" FORCE)
+
+set(PATCH_FILE "${CMAKE_SOURCE_DIR}/thirdparty/bustache/add-append.patch")
+
+set(PC ${Bash_EXECUTABLE} -c "set -x &&\
+ (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE}\\\")")
+
FetchContent_Declare(Bustache
GIT_REPOSITORY https://github.com/jamboree/bustache.git
GIT_TAG 47096caa8e1f9f7ebe34e3a022dbb822c174011d
+ PATCH_COMMAND "${PC}"
)
FetchContent_MakeAvailable(Bustache)
diff --git a/cmake/Couchbase.cmake b/cmake/Couchbase.cmake
new file mode 100644
index 0000000000..cf4ff7fb10
--- /dev/null
+++ b/cmake/Couchbase.cmake
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+include(FetchContent)
+
+include(fmt)
+include(Spdlog)
+include(Asio)
+
+set(COUCHBASE_CXX_CLIENT_BUILD_STATIC ON CACHE BOOL "" FORCE)
+set(COUCHBASE_CXX_CLIENT_BUILD_SHARED OFF CACHE BOOL "" FORCE)
+set(COUCHBASE_CXX_CLIENT_BUILD_TESTS OFF CACHE BOOL "" FORCE)
+set(COUCHBASE_CXX_CLIENT_BUILD_DOCS OFF CACHE BOOL "" FORCE)
+set(COUCHBASE_CXX_CLIENT_BUILD_EXAMPLES OFF CACHE BOOL "" FORCE)
+set(COUCHBASE_CXX_CLIENT_BUILD_TOOLS OFF CACHE BOOL "" FORCE)
+set(COUCHBASE_CXX_CLIENT_POST_LINKED_OPENSSL ON CACHE BOOL "" FORCE)
+set(COUCHBASE_CXX_CLIENT_INSTALL OFF CACHE BOOL "" FORCE)
+
+set(PATCH_FILE "${CMAKE_SOURCE_DIR}/thirdparty/couchbase/remove-thirdparty.patch")
+
+set(PC ${Bash_EXECUTABLE} -c "set -x &&\
+ (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE}\\\")")
+
+FetchContent_Declare(couchbase-cxx-client
+ URL https://github.com/couchbase/couchbase-cxx-client/releases/download/1.0.2/couchbase-cxx-client-1.0.2.tar.gz
+ URL_HASH SHA256=1954e6f5e063d94675428182bc8b1b82fd8e8532c10d1787f157aeb18bb37769
+ PATCH_COMMAND "${PC}"
+)
+FetchContent_MakeAvailable(couchbase-cxx-client)
+
+set(COUCHBASE_INCLUDE_DIR "${couchbase-cxx-client_SOURCE_DIR}" CACHE STRING "" FORCE)
diff --git a/cmake/DockerConfig.cmake b/cmake/DockerConfig.cmake
index 7346de263b..860503387d 100644
--- a/cmake/DockerConfig.cmake
+++ b/cmake/DockerConfig.cmake
@@ -67,6 +67,7 @@ add_custom_target(
-DENABLE_OPENCV=OFF
-DENABLE_BUSTACHE=OFF
-DENABLE_SFTP=OFF
+ -DENABLE_COUCHBASE=OFF
-DENABLE_TEST_PROCESSORS=OFF
-DENABLE_ROCKSDB=ON
-DENABLE_LIBARCHIVE=ON
diff --git a/cmake/MiNiFiOptions.cmake b/cmake/MiNiFiOptions.cmake
index 8a7ccc58cf..e69548ba0e 100644
--- a/cmake/MiNiFiOptions.cmake
+++ b/cmake/MiNiFiOptions.cmake
@@ -111,6 +111,7 @@ add_minifi_option(ENABLE_TEST_PROCESSORS "Enables test processors" OFF)
add_minifi_option(ENABLE_PROMETHEUS "Enables Prometheus support." ON)
add_minifi_option(ENABLE_GRAFANA_LOKI "Enable Grafana Loki support" OFF)
add_minifi_option(ENABLE_GRPC_FOR_LOKI "Enable gRPC for Grafana Loki extension" ON)
+add_minifi_option(ENABLE_COUCHBASE "Enable Couchbase support" OFF)
add_minifi_option(ENABLE_EXECUTE_PROCESS "Enable ExecuteProcess processor" OFF)
add_minifi_option(ENABLE_CONTROLLER "Enables the build of MiNiFi controller binary." ON)
diff --git a/cmake/Spdlog.cmake b/cmake/Spdlog.cmake
index 3b07298297..0631ca783d 100644
--- a/cmake/Spdlog.cmake
+++ b/cmake/Spdlog.cmake
@@ -20,8 +20,8 @@ include(FetchContent)
set(SPDLOG_FMT_EXTERNAL ON CACHE STRING "" FORCE)
FetchContent_Declare(Spdlog
- URL https://github.com/gabime/spdlog/archive/refs/tags/v1.12.0.tar.gz
- URL_HASH SHA256=4dccf2d10f410c1e2feaff89966bfc49a1abb29ef6f08246335b110e001e09a9
+ URL https://github.com/gabime/spdlog/archive/refs/tags/v1.14.1.tar.gz
+ URL_HASH SHA256=1586508029a7d0670dfcb2d97575dcdc242d3868a259742b69f100801ab4e16b
OVERRIDE_FIND_PACKAGE
)
FetchContent_MakeAvailable(Spdlog)
diff --git a/cmake/fmt.cmake b/cmake/fmt.cmake
index fc48799ce7..93fd56efb5 100644
--- a/cmake/fmt.cmake
+++ b/cmake/fmt.cmake
@@ -18,8 +18,8 @@
#
include(FetchContent)
FetchContent_Declare(Fmt
- URL https://github.com/fmtlib/fmt/archive/refs/tags/10.1.0.tar.gz
- URL_HASH SHA256=deb0a3ad2f5126658f2eefac7bf56a042488292de3d7a313526d667f3240ca0a
+ URL https://github.com/fmtlib/fmt/archive/refs/tags/11.0.2.tar.gz
+ URL_HASH SHA256=6cb1e6d37bdcb756dbbe59be438790db409cdb4868c66e888d5df9f13f7c027f
OVERRIDE_FIND_PACKAGE
)
FetchContent_MakeAvailable(Fmt)
diff --git a/docker/requirements.txt b/docker/requirements.txt
index 294f836ff9..09e7995890 100644
--- a/docker/requirements.txt
+++ b/docker/requirements.txt
@@ -12,3 +12,4 @@ azure-storage-blob==12.13.0
prometheus-api-client==0.5.0
humanfriendly==10.0
requests<2.29 # https://github.com/docker/docker-py/issues/3113
+couchbase==4.3.2
diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py
index f1e13a8948..9c8527458a 100644
--- a/docker/test/integration/cluster/ContainerStore.py
+++ b/docker/test/integration/cluster/ContainerStore.py
@@ -40,6 +40,7 @@
from .containers.GrafanaLokiContainer import GrafanaLokiOptions
from .containers.ReverseProxyContainer import ReverseProxyContainer
from .containers.DiagSlave import DiagSlave
+from .containers.CouchbaseServerContainer import CouchbaseServerContainer
from .FeatureContext import FeatureContext
@@ -302,6 +303,14 @@ def acquire_container(self, context, container_name: str, engine='minifi-cpp', c
network=self.network,
image_store=self.image_store,
command=command))
+ elif engine == "couchbase-server":
+ return self.containers.setdefault(container_name,
+ CouchbaseServerContainer(feature_context=feature_context,
+ name=container_name,
+ vols=self.vols,
+ network=self.network,
+ image_store=self.image_store,
+ command=command))
else:
raise Exception('invalid flow engine: \'%s\'' % engine)
@@ -411,3 +420,7 @@ def enable_multi_tenancy_in_grafana_loki(self):
def enable_ssl_in_nifi(self):
self.nifi_options.use_ssl = True
+
+ def run_post_startup_commands(self, container_name):
+ container_name = self.get_container_name_with_postfix(container_name)
+ return self.containers[container_name].run_post_startup_commands()
diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py
index 92e554fb0a..e161d5da47 100644
--- a/docker/test/integration/cluster/DockerTestCluster.py
+++ b/docker/test/integration/cluster/DockerTestCluster.py
@@ -35,6 +35,7 @@
from .checkers.SplunkChecker import SplunkChecker
from .checkers.GrafanaLokiChecker import GrafanaLokiChecker
from .checkers.ModbusChecker import ModbusChecker
+from .checkers.CouchbaseChecker import CouchbaseChecker
from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage, retry_check
@@ -54,6 +55,7 @@ def __init__(self, context, feature_id):
self.grafana_loki_checker = GrafanaLokiChecker()
self.minifi_controller_executor = MinifiControllerExecutor(self.container_communicator)
self.modbus_checker = ModbusChecker(self.container_communicator)
+ self.couchbase_checker = CouchbaseChecker()
def cleanup(self):
self.container_store.cleanup()
@@ -308,8 +310,11 @@ def wait_for_container_startup_to_finish(self, container_name):
startup_success = self.wait_for_startup_log(container_name, 300)
if not startup_success:
logging.error("Cluster startup failed for %s", container_name)
- self.log_app_output()
- return startup_success
+ return False
+ if not self.container_store.run_post_startup_commands(container_name):
+ logging.error("Failed to run post startup commands for container %s", container_name)
+ return False
+ return True
def wait_for_all_containers_to_finish_startup(self):
for container_name in self.container_store.get_container_names():
@@ -424,3 +429,6 @@ def set_value_on_plc_with_modbus(self, container_name, modbus_cmd):
def enable_ssl_in_nifi(self):
self.container_store.enable_ssl_in_nifi()
+
+ def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str):
+ return self.couchbase_checker.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type)
diff --git a/docker/test/integration/cluster/checkers/CouchbaseChecker.py b/docker/test/integration/cluster/checkers/CouchbaseChecker.py
new file mode 100644
index 0000000000..07a332cbf6
--- /dev/null
+++ b/docker/test/integration/cluster/checkers/CouchbaseChecker.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import json
+from couchbase.cluster import Cluster
+from couchbase.options import ClusterOptions
+from couchbase.auth import PasswordAuthenticator
+from couchbase.transcoder import RawBinaryTranscoder, RawStringTranscoder
+
+
+class CouchbaseChecker:
+ def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str):
+ try:
+ cluster = Cluster('couchbase://localhost', ClusterOptions(
+ PasswordAuthenticator('Administrator', 'password123')))
+
+ bucket = cluster.bucket(bucket_name)
+ collection = bucket.default_collection()
+
+ if expected_data_type.lower() == "binary":
+ binary_flag = 0x03 << 24
+ result = collection.get(doc_id, transcoder=RawBinaryTranscoder())
+ flags = result.flags
+ if not flags & binary_flag:
+ logging.error(f"Expected binary data for document '{doc_id}' but no binary flags were found.")
+ return False
+
+ content = result.content_as[bytes]
+ return content.decode('utf-8') == expected_data
+
+ if expected_data_type.lower() == "json":
+ json_flag = 0x02 << 24
+ result = collection.get(doc_id)
+ flags = result.flags
+ if not flags & json_flag:
+ logging.error(f"Expected JSON data for document '{doc_id}' but no JSON flags were found.")
+ return False
+
+ content = result.content_as[dict]
+ return content == json.loads(expected_data)
+
+ if expected_data_type.lower() == "string":
+ string_flag = 0x04 << 24
+ result = collection.get(doc_id, transcoder=RawStringTranscoder())
+ flags = result.flags
+ if not flags & string_flag:
+ logging.error(f"Expected string data for document '{doc_id}' but no string flags were found.")
+ return False
+
+ content = result.content_as[str]
+ return content == expected_data
+
+ logging.error(f"Unsupported data type '{expected_data_type}'")
+ return False
+ except Exception as e:
+ logging.error(f"Error while fetching document '{doc_id}' from bucket '{bucket_name}': {e}")
+ return False
diff --git a/docker/test/integration/cluster/containers/Container.py b/docker/test/integration/cluster/containers/Container.py
index b4a8efe852..d2e73fd4ba 100644
--- a/docker/test/integration/cluster/containers/Container.py
+++ b/docker/test/integration/cluster/containers/Container.py
@@ -34,6 +34,7 @@ def __init__(self, feature_context: FeatureContext, name, engine, vols, network,
# Get docker client
self.client = docker.from_env()
self.deployed = False
+ self.post_startup_commands_finished = False
def cleanup(self):
logging.info('Cleaning up container: %s', self.name)
@@ -84,3 +85,6 @@ def get_startup_finished_log_entry(self):
def get_app_log(self):
raise NotImplementedError()
+
+ def run_post_startup_commands(self):
+ return True
diff --git a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py
new file mode 100644
index 0000000000..1c43530460
--- /dev/null
+++ b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from .Container import Container
+from utils import retry_check
+
+
+class CouchbaseServerContainer(Container):
+ def __init__(self, feature_context, name, vols, network, image_store, command=None):
+ super().__init__(feature_context, name, 'couchbase-server', vols, network, image_store, command)
+
+ def get_startup_finished_log_entry(self):
+ # after startup the logs are only available in the container, only this message is shown
+ return "logs available in"
+
+ @retry_check(15, 2)
+ def run_post_startup_commands(self):
+ if self.post_startup_commands_finished:
+ return True
+
+ commands = [
+ ["couchbase-cli", "cluster-init", "-c", "localhost", "--cluster-username", "Administrator", "--cluster-password", "password123", "--services", "data,index,query",
+ "--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"],
+ ["couchbase-cli", "bucket-create", "-c", "localhost", "--username", "Administrator", "--password", "password123", "--bucket", "test_bucket", "--bucket-type", "couchbase",
+ "--bucket-ramsize", "1024"]
+ ]
+ for command in commands:
+ (code, _) = self.client.containers.get(self.name).exec_run(command)
+ if code != 0:
+ return False
+ self.post_startup_commands_finished = True
+ return True
+
+ def deploy(self):
+ if not self.set_deployed():
+ return
+
+ self.docker_container = self.client.containers.run(
+ "couchbase:community-7.6.2",
+ detach=True,
+ name=self.name,
+ network=self.network.name,
+ ports={'11210/tcp': 11210},
+ entrypoint=self.command)
diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py
index 16a30c7ee5..57526342db 100644
--- a/docker/test/integration/features/MiNiFi_integration_test_driver.py
+++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py
@@ -63,38 +63,43 @@ def start_kafka_broker(self, context):
self.cluster.acquire_container(context=context, name='kafka-broker', engine='kafka-broker')
self.cluster.deploy_container(name='zookeeper')
self.cluster.deploy_container(name='kafka-broker')
- assert self.cluster.wait_for_container_startup_to_finish('kafka-broker')
+ assert self.cluster.wait_for_container_startup_to_finish('kafka-broker') or self.cluster.log_app_output()
def start_splunk(self, context):
self.cluster.acquire_container(context=context, name='splunk', engine='splunk')
self.cluster.deploy_container(name='splunk')
- assert self.cluster.wait_for_container_startup_to_finish('splunk')
- assert self.cluster.enable_splunk_hec_indexer('splunk', 'splunk_hec_token')
+ assert self.cluster.wait_for_container_startup_to_finish('splunk') or self.cluster.log_app_output()
+ assert self.cluster.enable_splunk_hec_indexer('splunk', 'splunk_hec_token') or self.cluster.log_app_output()
def start_elasticsearch(self, context):
self.cluster.acquire_container(context=context, name='elasticsearch', engine='elasticsearch')
self.cluster.deploy_container('elasticsearch')
- assert self.cluster.wait_for_container_startup_to_finish('elasticsearch')
+ assert self.cluster.wait_for_container_startup_to_finish('elasticsearch') or self.cluster.log_app_output()
def start_opensearch(self, context):
self.cluster.acquire_container(context=context, name='opensearch', engine='opensearch')
self.cluster.deploy_container('opensearch')
- assert self.cluster.wait_for_container_startup_to_finish('opensearch')
+ assert self.cluster.wait_for_container_startup_to_finish('opensearch') or self.cluster.log_app_output()
def start_minifi_c2_server(self, context):
self.cluster.acquire_container(context=context, name="minifi-c2-server", engine="minifi-c2-server")
self.cluster.deploy_container('minifi-c2-server')
- assert self.cluster.wait_for_container_startup_to_finish('minifi-c2-server')
+ assert self.cluster.wait_for_container_startup_to_finish('minifi-c2-server') or self.cluster.log_app_output()
+
+ def start_couchbase_server(self, context):
+ self.cluster.acquire_container(context=context, name='couchbase-server', engine='couchbase-server')
+ self.cluster.deploy_container('couchbase-server')
+ assert self.cluster.wait_for_container_startup_to_finish('couchbase-server') or self.cluster.log_app_output()
def start(self, container_name=None):
if container_name is not None:
logging.info("Starting container %s", container_name)
self.cluster.deploy_container(container_name)
- assert self.cluster.wait_for_container_startup_to_finish(container_name)
+ assert self.cluster.wait_for_container_startup_to_finish(container_name) or self.cluster.log_app_output()
return
logging.info("MiNiFi_integration_test start")
self.cluster.deploy_all()
- assert self.cluster.wait_for_all_containers_to_finish_startup()
+ assert self.cluster.wait_for_all_containers_to_finish_startup() or self.cluster.log_app_output()
def stop(self, container_name):
logging.info("Stopping container %s", container_name)
@@ -443,3 +448,6 @@ def set_value_on_plc_with_modbus(self, container_name, modbus_cmd):
def enable_ssl_in_nifi(self):
self.cluster.enable_ssl_in_nifi()
+
+ def check_is_data_present_on_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str):
+ assert self.cluster.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type)
diff --git a/docker/test/integration/features/couchbase.feature b/docker/test/integration/features/couchbase.feature
new file mode 100644
index 0000000000..446821f25c
--- /dev/null
+++ b/docker/test/integration/features/couchbase.feature
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+@ENABLE_COUCHBASE
+Feature: Executing Couchbase operations from MiNiFi-C++
+ Background:
+ Given the content of "/tmp/output" is monitored
+
+ Scenario: A MiNiFi instance can insert json data to test bucket with PutCouchbaseKey processor
+ Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+ And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input'
+ And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
+ And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
+ And the "Document Type" property of the PutCouchbaseKey processor is set to "Json"
+ And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService"
+ And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
+ And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService"
+
+ And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey
+ And the "success" relationship of the PutCouchbaseKey processor is connected to the LogAttribute
+
+ When a Couchbase server is started
+ And all instances start up
+
+ Then the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 60 seconds
+ And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds
+ And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds
+ And the Minifi logs match the following regex: "key:couchbase.doc.sequence.number value:[1-9][0-9]*" in less than 1 seconds
+ And the Minifi logs match the following regex: "key:couchbase.partition.uuid value:[1-9][0-9]*" in less than 1 seconds
+ And the Minifi logs match the following regex: "key:couchbase.partition.id value:[1-9][0-9]*" in less than 1 seconds
+ And a document with id "test_doc_id" in bucket "test_bucket" is present with data '{"field1": "value1", "field2": "value2"}' of type "Json" in Couchbase
+
+ Scenario: A MiNiFi instance can insert binary data to test bucket with PutCouchbaseKey processor
+ Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+ And a file with the content '{"field1": "value1"}' is present in '/tmp/input'
+ And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
+ And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
+ And the "Document Type" property of the PutCouchbaseKey processor is set to "Binary"
+ And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService"
+ And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
+ And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService"
+
+ And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey
+ And the "success" relationship of the PutCouchbaseKey processor is connected to the LogAttribute
+
+ When a Couchbase server is started
+ And all instances start up
+
+ Then the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 60 seconds
+ And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds
+ And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds
+ And the Minifi logs match the following regex: "key:couchbase.doc.sequence.number value:[1-9][0-9]*" in less than 1 seconds
+ And the Minifi logs match the following regex: "key:couchbase.partition.uuid value:[1-9][0-9]*" in less than 1 seconds
+ And the Minifi logs match the following regex: "key:couchbase.partition.id value:[1-9][0-9]*" in less than 1 seconds
+ And a document with id "test_doc_id" in bucket "test_bucket" is present with data '{"field1": "value1"}' of type "Binary" in Couchbase
diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py
index ea099b21d5..6d4e464e90 100644
--- a/docker/test/integration/features/steps/steps.py
+++ b/docker/test/integration/features/steps/steps.py
@@ -26,6 +26,7 @@
from minifi.controllers.KubernetesControllerService import KubernetesControllerService
from minifi.controllers.JsonRecordSetWriter import JsonRecordSetWriter
from minifi.controllers.JsonRecordSetReader import JsonRecordSetReader
+from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService
from behave import given, then, when
from behave.model_describe import ModelDescriptor
@@ -1366,3 +1367,23 @@ def step_impl(context):
@given(u'PLC register has been set with {modbus_cmd} command')
def step_impl(context, modbus_cmd):
context.test.set_value_on_plc_with_modbus(context.test.get_container_name_with_postfix('diag-slave-tcp'), modbus_cmd)
+
+
+# Couchbase
+@when(u'a Couchbase server is started')
+def step_impl(context):
+ context.test.start_couchbase_server(context)
+
+
+@given("a CouchbaseClusterService is setup up with the name \"{service_name}\"")
+def step_impl(context, service_name):
+ couchbase_cluster_controller_service = CouchbaseClusterService(
+ name=service_name,
+ connection_string="couchbase://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")))
+ container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
+ container.add_controller(couchbase_cluster_controller_service)
+
+
+@then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present with data '{data}' of type \"{data_type}\" in Couchbase")
+def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: str):
+ context.test.check_is_data_present_on_couchbase(doc_id, bucket_name, data, data_type)
diff --git a/docker/test/integration/minifi/controllers/CouchbaseClusterService.py b/docker/test/integration/minifi/controllers/CouchbaseClusterService.py
new file mode 100644
index 0000000000..e06c1e6ffb
--- /dev/null
+++ b/docker/test/integration/minifi/controllers/CouchbaseClusterService.py
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ..core.ControllerService import ControllerService
+
+
+class CouchbaseClusterService(ControllerService):
+ def __init__(self, name, connection_string):
+ super(CouchbaseClusterService, self).__init__(name=name)
+
+ self.service_class = 'CouchbaseClusterService'
+ self.properties['Connection String'] = connection_string
+ self.properties['User Name'] = "Administrator"
+ self.properties['User Password'] = "password123"
diff --git a/docker/test/integration/minifi/processors/PutCouchbaseKey.py b/docker/test/integration/minifi/processors/PutCouchbaseKey.py
new file mode 100644
index 0000000000..5e94aaa07b
--- /dev/null
+++ b/docker/test/integration/minifi/processors/PutCouchbaseKey.py
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from ..core.Processor import Processor
+
+
+class PutCouchbaseKey(Processor):
+ def __init__(self, context, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+ super(PutCouchbaseKey, self).__init__(
+ context=context,
+ clazz='PutCouchbaseKey',
+ auto_terminate=['success', 'failure'],
+ schedule=schedule)
diff --git a/extensions/couchbase/CMakeLists.txt b/extensions/couchbase/CMakeLists.txt
new file mode 100644
index 0000000000..0299ce8fde
--- /dev/null
+++ b/extensions/couchbase/CMakeLists.txt
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+if (NOT (ENABLE_ALL OR ENABLE_COUCHBASE))
+ return()
+endif()
+
+include(Couchbase)
+include(${CMAKE_SOURCE_DIR}/extensions/ExtensionHeader.txt)
+
+file(GLOB SOURCES "*.cpp" "controllerservices/*.cpp" "processors/*.cpp")
+
+add_minifi_library(minifi-couchbase SHARED ${SOURCES})
+
+target_include_directories(minifi-couchbase PRIVATE BEFORE "${CMAKE_CURRENT_SOURCE_DIR}" "controllerservices" "processors" ${COUCHBASE_INCLUDE_DIR})
+target_link_libraries(minifi-couchbase ${LIBMINIFI} couchbase_cxx_client_static hdr_histogram_static snappy llhttp::llhttp)
+
+register_extension(minifi-couchbase "COUCHBASE EXTENSIONS" COUCHBASE-EXTENSIONS "This enables Couchbase support" "extensions/couchbase/tests")
+
+register_extension_linter(minifi-couchbase-extensions-linter)
diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp
new file mode 100644
index 0000000000..5207657f29
--- /dev/null
+++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.cpp
@@ -0,0 +1,168 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CouchbaseClusterService.h"
+#include "couchbase/codec/raw_binary_transcoder.hxx"
+#include "couchbase/codec/raw_string_transcoder.hxx"
+#include "couchbase/codec/raw_json_transcoder.hxx"
+
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::couchbase {
+
+namespace {
+
+constexpr auto temporary_connection_errors = std::to_array<::couchbase::errc::common>({
+ ::couchbase::errc::common::temporary_failure,
+ ::couchbase::errc::common::request_canceled,
+ ::couchbase::errc::common::internal_server_failure,
+ ::couchbase::errc::common::cas_mismatch,
+ ::couchbase::errc::common::ambiguous_timeout,
+ ::couchbase::errc::common::unambiguous_timeout,
+ ::couchbase::errc::common::rate_limited,
+ ::couchbase::errc::common::quota_limited
+});
+
+CouchbaseErrorType getErrorType(const std::error_code& error_code) {
+ for (const auto& temporary_error : temporary_connection_errors) {
+ if (static_cast(temporary_error) == error_code.value()) {
+ return CouchbaseErrorType::TEMPORARY;
+ }
+ }
+ return CouchbaseErrorType::FATAL;
+}
+
+} // namespace
+
+nonstd::expected<::couchbase::collection, CouchbaseErrorType> CouchbaseClient::getCollection(const CouchbaseCollection& collection) {
+ auto connection_result = establishConnection();
+ if (!connection_result) {
+ return nonstd::make_unexpected(connection_result.error());
+ }
+ std::lock_guard lock(cluster_mutex_);
+ return cluster_->bucket(collection.bucket_name).scope(collection.scope_name).collection(collection.collection_name);
+}
+
+nonstd::expected CouchbaseClient::upsert(const CouchbaseCollection& collection,
+ CouchbaseValueType document_type, const std::string& document_id, const std::vector& buffer, const ::couchbase::upsert_options& options) {
+ auto collection_result = getCollection(collection);
+ if (!collection_result.has_value()) {
+ return nonstd::make_unexpected(collection_result.error());
+ }
+
+ std::pair<::couchbase::error, ::couchbase::mutation_result> result;
+ if (document_type == CouchbaseValueType::Json) {
+ result = collection_result->upsert<::couchbase::codec::raw_json_transcoder>(document_id, buffer, options).get();
+ } else if (document_type == CouchbaseValueType::String) {
+ std::string data_str(reinterpret_cast(buffer.data()), buffer.size());
+ result = collection_result->upsert<::couchbase::codec::raw_string_transcoder>(document_id, data_str, options).get();
+ } else {
+ result = collection_result->upsert<::couchbase::codec::raw_binary_transcoder>(document_id, buffer, options).get();
+ }
+ auto& [upsert_err, upsert_resp] = result;
+ if (upsert_err.ec()) {
+ // ambiguous_timeout should not be retried as we do not know if the insert was successful or not
+ if (getErrorType(upsert_err.ec()) == CouchbaseErrorType::TEMPORARY && upsert_err.ec().value() != static_cast(::couchbase::errc::common::ambiguous_timeout)) {
+ logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}' due to temporary issue, error code: '{}', message: '{}'",
+ document_id, collection.bucket_name, collection.scope_name, collection.collection_name, upsert_err.ec(), upsert_err.message());
+ return nonstd::make_unexpected(CouchbaseErrorType::TEMPORARY);
+ }
+ logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}' with error code: '{}', message: '{}'",
+ document_id, collection.bucket_name, collection.scope_name, collection.collection_name, upsert_err.ec(), upsert_err.message());
+ return nonstd::make_unexpected(CouchbaseErrorType::FATAL);
+ } else {
+ const uint64_t partition_uuid = (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->partition_uuid() : 0);
+ const uint64_t sequence_number = (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->sequence_number() : 0);
+ const uint16_t partition_id = (upsert_resp.mutation_token().has_value() ? upsert_resp.mutation_token()->partition_id() : 0);
+ return CouchbaseUpsertResult {
+ collection.bucket_name,
+ upsert_resp.cas().value(),
+ partition_uuid,
+ sequence_number,
+ partition_id
+ };
+ }
+}
+
+void CouchbaseClient::close() {
+ std::lock_guard lock(cluster_mutex_);
+ if (cluster_) {
+ cluster_->close().wait();
+ }
+ cluster_ = std::nullopt;
+}
+
+nonstd::expected CouchbaseClient::establishConnection() {
+ std::lock_guard lock(cluster_mutex_);
+ if (cluster_) {
+ return {};
+ }
+
+ auto options = ::couchbase::cluster_options(username_, password_);
+ auto [connect_err, cluster] = ::couchbase::cluster::connect(connection_string_, options).get();
+ if (connect_err.ec()) {
+ logger_->log_error("Failed to connect to Couchbase cluster with error code: '{}' and message: '{}'", connect_err.ec(), connect_err.message());
+ return nonstd::make_unexpected(getErrorType(connect_err.ec()));
+ }
+
+ cluster_ = std::move(cluster);
+ return {};
+}
+
+namespace controllers {
+
+void CouchbaseClusterService::initialize() {
+ setSupportedProperties(Properties);
+}
+
+void CouchbaseClusterService::onEnable() {
+ std::string connection_string;
+ getProperty(ConnectionString, connection_string);
+ std::string username;
+ getProperty(UserName, username);
+ std::string password;
+ getProperty(UserPassword, password);
+ if (connection_string.empty() || username.empty() || password.empty()) {
+ throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing connection string, username or password");
+ }
+
+ client_ = std::make_unique(connection_string, username, password, logger_);
+ auto result = client_->establishConnection();
+ if (!result) {
+ if (result.error() == CouchbaseErrorType::FATAL) {
+ throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Failed to connect to Couchbase cluster with fatal error");
+ }
+ logger_->log_warn("Failed to connect to Couchbase cluster with temporary error, will retry connection when a Couchbase processor is triggered");
+ }
+}
+
+gsl::not_null> CouchbaseClusterService::getFromProperty(const core::ProcessContext& context, const core::PropertyReference& property) {
+ std::shared_ptr couchbase_cluster_service;
+ if (auto connection_controller_name = context.getProperty(property)) {
+ couchbase_cluster_service = std::dynamic_pointer_cast(context.getControllerService(*connection_controller_name, context.getProcessorNode()->getUUID()));
+ }
+ if (!couchbase_cluster_service) {
+ throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Missing Couchbase Cluster Service");
+ }
+ return gsl::make_not_null(couchbase_cluster_service);
+}
+
+REGISTER_RESOURCE(CouchbaseClusterService, ControllerService);
+
+} // namespace controllers
+} // namespace org::apache::nifi::minifi::couchbase
diff --git a/extensions/couchbase/controllerservices/CouchbaseClusterService.h b/extensions/couchbase/controllerservices/CouchbaseClusterService.h
new file mode 100644
index 0000000000..b848b741dc
--- /dev/null
+++ b/extensions/couchbase/controllerservices/CouchbaseClusterService.h
@@ -0,0 +1,162 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include
+#include
+#include
+#include
+
+#include "core/controller/ControllerService.h"
+#include "core/PropertyDefinition.h"
+#include "core/PropertyDefinitionBuilder.h"
+#include "core/PropertyType.h"
+#include "couchbase/cluster.hxx"
+#include "core/ProcessContext.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org::apache::nifi::minifi::couchbase {
+
+struct CouchbaseCollection {
+ std::string bucket_name;
+ std::string scope_name;
+ std::string collection_name;
+};
+
+struct CouchbaseUpsertResult {
+ std::string bucket_name;
+ std::uint64_t cas{0};
+ std::uint64_t sequence_number{0};
+ std::uint64_t partition_uuid{0};
+ std::uint16_t partition_id{0};
+};
+
+enum class CouchbaseValueType {
+ Json,
+ Binary,
+ String
+};
+
+enum class CouchbaseErrorType {
+ FATAL,
+ TEMPORARY,
+};
+
+class CouchbaseClient {
+ public:
+ CouchbaseClient(std::string connection_string, std::string username, std::string password, const std::shared_ptr& logger)
+ : connection_string_(std::move(connection_string)), username_(std::move(username)), password_(std::move(password)), logger_(logger) {
+ }
+
+ ~CouchbaseClient() {
+ close();
+ }
+
+ CouchbaseClient(const CouchbaseClient&) = delete;
+ CouchbaseClient(CouchbaseClient&&) = delete;
+ CouchbaseClient& operator=(CouchbaseClient&&) = delete;
+ CouchbaseClient& operator=(const CouchbaseClient&) = delete;
+
+ nonstd::expected upsert(const CouchbaseCollection& collection, CouchbaseValueType document_type, const std::string& document_id,
+ const std::vector& buffer, const ::couchbase::upsert_options& options);
+ nonstd::expected establishConnection();
+ void close();
+
+ private:
+ nonstd::expected<::couchbase::collection, CouchbaseErrorType> getCollection(const CouchbaseCollection& collection);
+
+ std::string connection_string_;
+ std::string username_;
+ std::string password_;
+ std::mutex cluster_mutex_;
+ std::optional<::couchbase::cluster> cluster_;
+ std::shared_ptr logger_;
+};
+
+namespace controllers {
+
+class CouchbaseClusterService : public core::controller::ControllerService {
+ public:
+ explicit CouchbaseClusterService(std::string_view name, const minifi::utils::Identifier &uuid = {})
+ : ControllerService(name, uuid) {
+ }
+
+ explicit CouchbaseClusterService(std::string_view name, const std::shared_ptr& /*configuration*/)
+ : ControllerService(name) {
+ }
+
+ EXTENSIONAPI static constexpr const char* Description = "Provides a centralized Couchbase connection and bucket passwords management. Bucket passwords can be specified via dynamic properties.";
+
+ EXTENSIONAPI static constexpr auto ConnectionString = core::PropertyDefinitionBuilder<>::createProperty("Connection String")
+ .withDescription("The hostnames or ip addresses of the bootstraping nodes and optional parameters. Syntax: couchbase://node1,node2,nodeN?param1=value1¶m2=value2¶mN=valueN")
+ .isRequired(true)
+ .build();
+ EXTENSIONAPI static constexpr auto UserName = core::PropertyDefinitionBuilder<>::createProperty("User Name")
+ .withDescription("The user name to authenticate MiNiFi as a Couchbase client.")
+ .build();
+ EXTENSIONAPI static constexpr auto UserPassword = core::PropertyDefinitionBuilder<>::createProperty("User Password")
+ .withDescription("The user password to authenticate MiNiFi as a Couchbase client.")
+ .isSensitive(true)
+ .build();
+
+ EXTENSIONAPI static constexpr auto Properties = std::to_array({
+ ConnectionString,
+ UserName,
+ UserPassword
+ });
+
+
+ EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
+
+ void initialize() override;
+
+ void yield() override {
+ };
+
+ bool isWorkAvailable() override {
+ return false;
+ };
+
+ bool isRunning() const override {
+ return getState() == core::controller::ControllerServiceState::ENABLED;
+ }
+
+ void onEnable() override;
+ void notifyStop() override {
+ if (client_) {
+ client_->close();
+ }
+ }
+
+ virtual nonstd::expected upsert(const CouchbaseCollection& collection, CouchbaseValueType document_type,
+ const std::string& document_id, const std::vector& buffer, const ::couchbase::upsert_options& options) {
+ gsl_Expects(client_);
+ return client_->upsert(collection, document_type, document_id, buffer, options);
+ }
+
+ static gsl::not_null> getFromProperty(const core::ProcessContext& context, const core::PropertyReference& property);
+
+ private:
+ std::unique_ptr client_;
+ std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(uuid_);
+};
+
+} // namespace controllers
+} // namespace org::apache::nifi::minifi::couchbase
diff --git a/extensions/couchbase/processors/PutCouchbaseKey.cpp b/extensions/couchbase/processors/PutCouchbaseKey.cpp
new file mode 100644
index 0000000000..6f8ed54ceb
--- /dev/null
+++ b/extensions/couchbase/processors/PutCouchbaseKey.cpp
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PutCouchbaseKey.h"
+#include "utils/gsl.h"
+#include "core/Resource.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::couchbase::processors {
+
+void PutCouchbaseKey::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
+ couchbase_cluster_service_ = controllers::CouchbaseClusterService::getFromProperty(context, PutCouchbaseKey::CouchbaseClusterControllerService);
+ document_type_ = utils::parseEnumProperty(context, PutCouchbaseKey::DocumentType);
+ persist_to_ = utils::parseEnumProperty<::couchbase::persist_to>(context, PutCouchbaseKey::PersistTo);
+ replicate_to_ = utils::parseEnumProperty<::couchbase::replicate_to>(context, PutCouchbaseKey::ReplicateTo);
+}
+
+void PutCouchbaseKey::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
+ gsl_Expects(couchbase_cluster_service_);
+
+ auto flow_file = session.get();
+ if (!flow_file) {
+ context.yield();
+ return;
+ }
+
+ CouchbaseCollection collection;
+ if (!context.getProperty(BucketName, collection.bucket_name, flow_file.get()) || collection.bucket_name.empty()) {
+ logger_->log_error("Bucket '{}' is invalid or empty!", collection.bucket_name);
+ session.transfer(flow_file, Failure);
+ return;
+ }
+
+ if (!context.getProperty(ScopeName, collection.scope_name, flow_file.get()) || collection.scope_name.empty()) {
+ collection.scope_name = ::couchbase::scope::default_name;
+ }
+
+ if (!context.getProperty(CollectionName, collection.collection_name, flow_file.get()) || collection.collection_name.empty()) {
+ collection.collection_name = ::couchbase::collection::default_name;
+ }
+
+ std::string document_id;
+ if (!context.getProperty(DocumentId, document_id, flow_file.get()) || document_id.empty()) {
+ document_id = flow_file->getAttribute(core::SpecialFlowAttribute::UUID).value_or(utils::IdGenerator::getIdGenerator()->generate().to_string());
+ }
+
+ ::couchbase::upsert_options options;
+ options.durability(persist_to_, replicate_to_);
+ auto result = session.readBuffer(flow_file);
+ if (auto upsert_result = couchbase_cluster_service_->upsert(collection, document_type_, document_id, result.buffer, options)) {
+ session.putAttribute(*flow_file, "couchbase.bucket", upsert_result->bucket_name);
+ session.putAttribute(*flow_file, "couchbase.doc.id", document_id);
+ session.putAttribute(*flow_file, "couchbase.doc.cas", std::to_string(upsert_result->cas));
+ session.putAttribute(*flow_file, "couchbase.doc.sequence.number", std::to_string(upsert_result->sequence_number));
+ session.putAttribute(*flow_file, "couchbase.partition.uuid", std::to_string(upsert_result->partition_uuid));
+ session.putAttribute(*flow_file, "couchbase.partition.id", std::to_string(upsert_result->partition_id));
+ session.transfer(flow_file, Success);
+ } else if (upsert_result.error() == CouchbaseErrorType::TEMPORARY) {
+ logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}' due to temporary issue, transferring to retry relationship",
+ document_id, collection.bucket_name, collection.scope_name, collection.collection_name);
+ session.transfer(flow_file, Retry);
+ } else {
+ logger_->log_error("Failed to upsert document '{}' to collection '{}.{}.{}', transferring to failure relationship",
+ document_id, collection.bucket_name, collection.scope_name, collection.collection_name);
+ session.transfer(flow_file, Failure);
+ }
+}
+
+REGISTER_RESOURCE(PutCouchbaseKey, Processor);
+
+} // namespace org::apache::nifi::minifi::couchbase::processors
diff --git a/extensions/couchbase/processors/PutCouchbaseKey.h b/extensions/couchbase/processors/PutCouchbaseKey.h
new file mode 100644
index 0000000000..2b4f420afe
--- /dev/null
+++ b/extensions/couchbase/processors/PutCouchbaseKey.h
@@ -0,0 +1,161 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include
+#include
+#include
+
+#include "core/AbstractProcessor.h"
+#include "core/ProcessSession.h"
+#include "utils/Enum.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "CouchbaseClusterService.h"
+#include "couchbase/persist_to.hxx"
+#include "couchbase/replicate_to.hxx"
+
+namespace magic_enum::customize {
+
+template <>
+constexpr customize_t enum_name<::couchbase::persist_to>(::couchbase::persist_to value) noexcept {
+ switch (value) {
+ case ::couchbase::persist_to::none:
+ return "NONE";
+ case ::couchbase::persist_to::active:
+ return "ACTIVE";
+ case ::couchbase::persist_to::one:
+ return "ONE";
+ case ::couchbase::persist_to::two:
+ return "TWO";
+ case ::couchbase::persist_to::three:
+ return "THREE";
+ case ::couchbase::persist_to::four:
+ return "FOUR";
+ }
+ return invalid_tag;
+}
+
+template <>
+constexpr customize_t enum_name<::couchbase::replicate_to>(::couchbase::replicate_to value) noexcept {
+ switch (value) {
+ case ::couchbase::replicate_to::none:
+ return "NONE";
+ case ::couchbase::replicate_to::one:
+ return "ONE";
+ case ::couchbase::replicate_to::two:
+ return "TWO";
+ case ::couchbase::replicate_to::three:
+ return "THREE";
+ }
+ return invalid_tag;
+}
+} // namespace magic_enum::customize
+
+namespace org::apache::nifi::minifi::couchbase::processors {
+
+class PutCouchbaseKey final : public core::AbstractProcessor {
+ public:
+ using core::AbstractProcessor::AbstractProcessor;
+
+ EXTENSIONAPI static constexpr const char* Description = "Put a document to Couchbase Server via Key/Value access.";
+
+ EXTENSIONAPI static constexpr auto CouchbaseClusterControllerService = core::PropertyDefinitionBuilder<>::createProperty("Couchbase Cluster Controller Service")
+ .withDescription("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.")
+ .withAllowedTypes()
+ .isRequired(true)
+ .build();
+ EXTENSIONAPI static constexpr auto BucketName = core::PropertyDefinitionBuilder<>::createProperty("Bucket Name")
+ .withDescription("The name of bucket to access.")
+ .withDefaultValue("default")
+ .isRequired(true)
+ .supportsExpressionLanguage(true)
+ .build();
+ EXTENSIONAPI static constexpr auto ScopeName = core::PropertyDefinitionBuilder<>::createProperty("Scope Name")
+ .withDescription("Scope to use inside the bucket. If not specified, the _default scope is used.")
+ .supportsExpressionLanguage(true)
+ .build();
+ EXTENSIONAPI static constexpr auto CollectionName = core::PropertyDefinitionBuilder<>::createProperty("Collection Name")
+ .withDescription("Collection to use inside the bucket scope. If not specified, the _default collection is used.")
+ .supportsExpressionLanguage(true)
+ .build();
+ EXTENSIONAPI static constexpr auto DocumentType = core::PropertyDefinitionBuilder()>::createProperty("Document Type")
+ .withDescription("Content type to store data as.")
+ .isRequired(true)
+ .withDefaultValue(magic_enum::enum_name(CouchbaseValueType::Json))
+ .withAllowedValues(magic_enum::enum_names())
+ .build();
+ EXTENSIONAPI static constexpr auto DocumentId = core::PropertyDefinitionBuilder<>::createProperty("Document Id")
+ .withDescription("A static, fixed Couchbase document id, or an expression to construct the Couchbase document id. "
+ "If not specified, either the FlowFile uuid attribute or if that's not found a generated uuid will be used.")
+ .supportsExpressionLanguage(true)
+ .build();
+ EXTENSIONAPI static constexpr auto PersistTo = core::PropertyDefinitionBuilder<6>::createProperty("Persist To")
+ .withDescription("Durability constraint about disk persistence.")
+ .isRequired(true)
+ .withDefaultValue(magic_enum::enum_name(::couchbase::persist_to::none))
+ .withAllowedValues(magic_enum::enum_names<::couchbase::persist_to>())
+ .build();
+ EXTENSIONAPI static constexpr auto ReplicateTo = core::PropertyDefinitionBuilder<4>::createProperty("Replicate To")
+ .withDescription("Durability constraint about replication.")
+ .isRequired(true)
+ .withDefaultValue(magic_enum::enum_name(::couchbase::replicate_to::none))
+ .withAllowedValues(magic_enum::enum_names<::couchbase::replicate_to>())
+ .build();
+
+ EXTENSIONAPI static constexpr auto Properties = std::to_array({
+ CouchbaseClusterControllerService,
+ BucketName,
+ ScopeName,
+ CollectionName,
+ DocumentType,
+ DocumentId,
+ PersistTo,
+ ReplicateTo
+ });
+
+ EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "All FlowFiles that are written to Couchbase Server are routed to this relationship."};
+ EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "All FlowFiles failed to be written to Couchbase Server and not retry-able are routed to this relationship."};
+ EXTENSIONAPI static constexpr auto Retry = core::RelationshipDefinition{"retry", "All FlowFiles failed to be written to Couchbase Server but can be retried are routed to this relationship."};
+ EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure, Retry};
+
+ EXTENSIONAPI static constexpr auto CouchbaseBucket = core::OutputAttributeDefinition<>{"couchbase.bucket", {Success}, "Bucket where the document was stored."};
+ EXTENSIONAPI static constexpr auto CouchbaseDocId = core::OutputAttributeDefinition<>{"couchbase.doc.id", {Success}, "Id of the document."};
+ EXTENSIONAPI static constexpr auto CouchbaseDocCas = core::OutputAttributeDefinition<>{"couchbase.doc.cas", {Success}, "CAS of the document."};
+ EXTENSIONAPI static constexpr auto CouchbaseDocSequenceNumber = core::OutputAttributeDefinition<>{"couchbase.doc.sequence.number", {Success}, "Sequence number associated with the document."};
+ EXTENSIONAPI static constexpr auto CouchbasePartitionUUID = core::OutputAttributeDefinition<>{"couchbase.partition.uuid", {Success}, "UUID of partition."};
+ EXTENSIONAPI static constexpr auto CouchbasePartitionId = core::OutputAttributeDefinition<>{"couchbase.partition.id", {Success}, "ID of partition (also known as vBucket)."};
+ EXTENSIONAPI static constexpr auto OutputAttributes = std::array{
+ CouchbaseBucket, CouchbaseDocId, CouchbaseDocCas, CouchbaseDocSequenceNumber, CouchbasePartitionUUID, CouchbasePartitionId};
+
+ EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+ EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
+ EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED;
+ EXTENSIONAPI static constexpr bool IsSingleThreaded = false;
+
+ void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& sessionFactory) override;
+ void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
+
+ private:
+ std::shared_ptr couchbase_cluster_service_;
+ std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(uuid_);
+ CouchbaseValueType document_type_ = CouchbaseValueType::Json;
+ ::couchbase::persist_to persist_to_ = ::couchbase::persist_to::none;
+ ::couchbase::replicate_to replicate_to_ = ::couchbase::replicate_to::none;
+};
+
+} // namespace org::apache::nifi::minifi::couchbase::processors
diff --git a/extensions/couchbase/tests/CMakeLists.txt b/extensions/couchbase/tests/CMakeLists.txt
new file mode 100644
index 0000000000..18caaeff0f
--- /dev/null
+++ b/extensions/couchbase/tests/CMakeLists.txt
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB COUCHBASE_TESTS "*.cpp")
+
+SET(COUCHBASE_TEST_COUNT 0)
+FOREACH(testfile ${COUCHBASE_TESTS})
+ get_filename_component(testfilename "${testfile}" NAME_WE)
+ add_minifi_executable("${testfilename}" "${testfile}")
+ target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
+ target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/couchbase/controllerservices")
+ target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/couchbase/processors")
+ target_include_directories(${testfilename} PRIVATE BEFORE "${CMAKE_SOURCE_DIR}/extensions/couchbase/")
+ target_include_directories(${testfilename} PRIVATE BEFORE "${COUCHBASE_INCLUDE_DIR}")
+
+ createTests("${testfilename}")
+ target_link_libraries(${testfilename} Catch2WithMain)
+ target_link_libraries(${testfilename} minifi-couchbase)
+ target_link_libraries(${testfilename} minifi-standard-processors)
+ MATH(EXPR COUCHBASE_TEST_COUNT "${COUCHBASE_TEST_COUNT}+1")
+ add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
+ENDFOREACH()
+message("-- Finished building ${COUCHBASE_TEST_COUNT} Couchbase related test file(s)...")
diff --git a/extensions/couchbase/tests/MockCouchbaseClusterService.h b/extensions/couchbase/tests/MockCouchbaseClusterService.h
new file mode 100644
index 0000000000..a2757f16c0
--- /dev/null
+++ b/extensions/couchbase/tests/MockCouchbaseClusterService.h
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include
+#include
+#include "CouchbaseClusterService.h"
+#include "unit/Catch.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::couchbase::test {
+
+const std::uint64_t COUCHBASE_PUT_RESULT_CAS = 9876;
+const std::uint64_t COUCHBASE_PUT_RESULT_SEQUENCE_NUMBER = 345;
+const std::uint64_t COUCHBASE_PUT_RESULT_PARTITION_UUID = 7890123456;
+const std::uint16_t COUCHBASE_PUT_RESULT_PARTITION_ID = 1234;
+
+struct UpsertParameters {
+ CouchbaseValueType document_type;
+ std::string document_id;
+ std::vector buffer;
+ ::couchbase::upsert_options options;
+};
+
+class MockCouchbaseClusterService : public controllers::CouchbaseClusterService {
+ public:
+ using CouchbaseClusterService::CouchbaseClusterService;
+ EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
+ ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_CONTROLLER_SERVICES
+
+ void onEnable() override {}
+ void notifyStop() override {}
+
+ nonstd::expected upsert(const CouchbaseCollection& collection, CouchbaseValueType document_type, const std::string& document_id,
+ const std::vector& buffer, const ::couchbase::upsert_options& options) override {
+ collection_ = collection;
+ upsert_parameters_.document_type = document_type;
+ upsert_parameters_.document_id = document_id;
+ upsert_parameters_.buffer = buffer;
+ upsert_parameters_.options = options;
+
+ if (upsert_error_) {
+ return nonstd::make_unexpected(*upsert_error_);
+ } else {
+ return CouchbaseUpsertResult{collection_.bucket_name, COUCHBASE_PUT_RESULT_CAS, COUCHBASE_PUT_RESULT_SEQUENCE_NUMBER, COUCHBASE_PUT_RESULT_PARTITION_UUID, COUCHBASE_PUT_RESULT_PARTITION_ID};
+ }
+ }
+
+ UpsertParameters getUpsertParameters() const {
+ return upsert_parameters_;
+ }
+
+ CouchbaseCollection getCollectionParameter() const {
+ return collection_;
+ }
+
+ void setUpsertError(const CouchbaseErrorType upsert_error) {
+ upsert_error_ = upsert_error;
+ }
+
+ private:
+ CouchbaseCollection collection_;
+ UpsertParameters upsert_parameters_;
+ std::optional upsert_error_;
+};
+} // namespace org::apache::nifi::minifi::couchbase::test
diff --git a/extensions/couchbase/tests/PutCouchbaseKeyTests.cpp b/extensions/couchbase/tests/PutCouchbaseKeyTests.cpp
new file mode 100644
index 0000000000..84afe362b1
--- /dev/null
+++ b/extensions/couchbase/tests/PutCouchbaseKeyTests.cpp
@@ -0,0 +1,172 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "unit/SingleProcessorTestController.h"
+#include "processors/PutCouchbaseKey.h"
+#include "MockCouchbaseClusterService.h"
+#include "core/Resource.h"
+
+namespace org::apache::nifi::minifi::couchbase::test {
+
+REGISTER_RESOURCE(MockCouchbaseClusterService, ControllerService);
+
+const std::string TEST_UUID = "a53f0e78-b91a-4a82-939b-639174edb00b";
+
+struct ExpectedCallOptions {
+ std::string bucket_name;
+ std::string scope_name;
+ std::string collection_name;
+ ::couchbase::persist_to persist_to;
+ ::couchbase::replicate_to replicate_to;
+ CouchbaseValueType document_type;
+ std::string doc_id;
+};
+
+class PutCouchbaseKeyTestController : public TestController {
+ public:
+ PutCouchbaseKeyTestController()
+ : controller_(std::make_unique("PutCouchbaseKey")),
+ proc_(controller_.getProcessor()) {
+ LogTestController::getInstance().setDebug();
+ LogTestController::getInstance().setDebug();
+ LogTestController::getInstance().setTrace();
+ LogTestController::getInstance().setDebug();
+ LogTestController::getInstance().setDebug();
+ auto controller_service_node = controller_.plan->addController("MockCouchbaseClusterService", "MockCouchbaseClusterService");
+ mock_couchbase_cluster_service_ = std::static_pointer_cast(controller_service_node->getControllerServiceImplementation());
+ proc_->setProperty(processors::PutCouchbaseKey::CouchbaseClusterControllerService, "MockCouchbaseClusterService");
+ }
+
+ [[nodiscard]] static std::vector stringToByteVector(const std::string& str) {
+ std::vector byte_vector;
+ byte_vector.reserve(str.size());
+ for (char ch : str) {
+ byte_vector.push_back(static_cast(ch));
+ }
+ return byte_vector;
+ }
+
+ void verifyResults(const minifi::test::ProcessorTriggerResult& results, const minifi::core::Relationship& expected_result, const ExpectedCallOptions& expected_call_options,
+ const std::string& input) const {
+ std::shared_ptr flow_file;
+ if (expected_result == processors::PutCouchbaseKey::Success) {
+ REQUIRE(results.at(processors::PutCouchbaseKey::Success).size() == 1);
+ REQUIRE(results.at(processors::PutCouchbaseKey::Failure).empty());
+ REQUIRE(results.at(processors::PutCouchbaseKey::Retry).empty());
+ flow_file = results.at(processors::PutCouchbaseKey::Success)[0];
+ } else if (expected_result == processors::PutCouchbaseKey::Failure) {
+ REQUIRE(results.at(processors::PutCouchbaseKey::Success).empty());
+ REQUIRE(results.at(processors::PutCouchbaseKey::Failure).size() == 1);
+ REQUIRE(results.at(processors::PutCouchbaseKey::Retry).empty());
+ flow_file = results.at(processors::PutCouchbaseKey::Failure)[0];
+ REQUIRE(LogTestController::getInstance().contains("Failed to upsert document", 1s));
+ } else {
+ REQUIRE(results.at(processors::PutCouchbaseKey::Success).empty());
+ REQUIRE(results.at(processors::PutCouchbaseKey::Failure).empty());
+ REQUIRE(results.at(processors::PutCouchbaseKey::Retry).size() == 1);
+ flow_file = results.at(processors::PutCouchbaseKey::Retry)[0];
+ }
+
+ auto get_collection_parameters = mock_couchbase_cluster_service_->getCollectionParameter();
+ CHECK(get_collection_parameters.bucket_name == expected_call_options.bucket_name);
+ CHECK(get_collection_parameters.collection_name == expected_call_options.collection_name);
+ CHECK(get_collection_parameters.scope_name == expected_call_options.scope_name);
+
+ auto upsert_parameters = mock_couchbase_cluster_service_->getUpsertParameters();
+ CHECK(upsert_parameters.document_type == expected_call_options.document_type);
+ auto expected_doc_id = expected_call_options.doc_id.empty() ? TEST_UUID : expected_call_options.doc_id;
+ CHECK(upsert_parameters.document_id == expected_doc_id);
+ CHECK(upsert_parameters.buffer == stringToByteVector(input));
+
+ auto upsert_options = upsert_parameters.options.build();
+ CHECK(upsert_options.persist_to == expected_call_options.persist_to);
+ CHECK(upsert_options.replicate_to == expected_call_options.replicate_to);
+
+ if (expected_result != processors::PutCouchbaseKey::Success) {
+ return;
+ }
+
+ CHECK(flow_file->getAttribute("couchbase.bucket").value() == expected_call_options.bucket_name);
+ CHECK(flow_file->getAttribute("couchbase.doc.id").value() == expected_doc_id);
+ CHECK(flow_file->getAttribute("couchbase.doc.cas").value() == std::to_string(COUCHBASE_PUT_RESULT_CAS));
+ CHECK(flow_file->getAttribute("couchbase.doc.sequence.number").value() == std::to_string(COUCHBASE_PUT_RESULT_SEQUENCE_NUMBER));
+ CHECK(flow_file->getAttribute("couchbase.partition.uuid").value() == std::to_string(COUCHBASE_PUT_RESULT_PARTITION_UUID));
+ CHECK(flow_file->getAttribute("couchbase.partition.id").value() == std::to_string(COUCHBASE_PUT_RESULT_PARTITION_ID));
+ }
+
+ protected:
+ minifi::test::SingleProcessorTestController controller_;
+ core::Processor* proc_ = nullptr;
+ std::shared_ptr mock_couchbase_cluster_service_;
+};
+
+TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Invalid Couchbase cluster controller service", "[putcouchbasekey]") {
+ proc_->setProperty(processors::PutCouchbaseKey::CouchbaseClusterControllerService, "invalid");
+ REQUIRE_THROWS_AS(controller_.trigger({minifi::test::InputFlowFileData{"{\"name\": \"John\"}\n{\"name\": \"Jill\"}", {{"uuid", TEST_UUID}}}}), minifi::Exception);
+}
+
+TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Invalid bucket name", "[putcouchbasekey]") {
+ proc_->setProperty(processors::PutCouchbaseKey::BucketName, "");
+ auto results = controller_.trigger({minifi::test::InputFlowFileData{"{\"name\": \"John\"}\n{\"name\": \"Jill\"}", {{"uuid", TEST_UUID}}}});
+ REQUIRE(results[processors::PutCouchbaseKey::Failure].size() == 1);
+ REQUIRE(LogTestController::getInstance().contains("Bucket '' is invalid or empty!", 1s));
+}
+
+TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Put succeeeds with default properties", "[putcouchbasekey]") {
+ proc_->setProperty(processors::PutCouchbaseKey::BucketName, "mybucket");
+ const std::string input = "{\"name\": \"John\"}\n{\"name\": \"Jill\"}";
+ auto results = controller_.trigger({minifi::test::InputFlowFileData{input, {{"uuid", TEST_UUID}}}});
+ verifyResults(results, processors::PutCouchbaseKey::Success, ExpectedCallOptions{"mybucket", "_default", "_default",
+ ::couchbase::persist_to::none, ::couchbase::replicate_to::none, CouchbaseValueType::Json, ""}, input);
+}
+
+TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Put succeeeds with optional properties", "[putcouchbasekey]") {
+ proc_->setProperty(processors::PutCouchbaseKey::BucketName, "mybucket");
+ proc_->setProperty(processors::PutCouchbaseKey::ScopeName, "scope1");
+ proc_->setProperty(processors::PutCouchbaseKey::CollectionName, "collection1");
+ proc_->setProperty(processors::PutCouchbaseKey::DocumentType, "Binary");
+ proc_->setProperty(processors::PutCouchbaseKey::DocumentId, "important_doc");
+ proc_->setProperty(processors::PutCouchbaseKey::PersistTo, "ACTIVE");
+ proc_->setProperty(processors::PutCouchbaseKey::ReplicateTo, "TWO");
+ const std::string input = "{\"name\": \"John\"}\n{\"name\": \"Jill\"}";
+ auto results = controller_.trigger({minifi::test::InputFlowFileData{input, {{"uuid", TEST_UUID}}}});
+ verifyResults(results, processors::PutCouchbaseKey::Success, ExpectedCallOptions{"mybucket", "scope1", "collection1", ::couchbase::persist_to::active,
+ ::couchbase::replicate_to::two, CouchbaseValueType::Binary, "important_doc"}, input);
+}
+
+TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Put fails with default properties", "[putcouchbasekey]") {
+ proc_->setProperty(processors::PutCouchbaseKey::BucketName, "mybucket");
+ mock_couchbase_cluster_service_->setUpsertError(CouchbaseErrorType::FATAL);
+ const std::string input = "{\"name\": \"John\"}\n{\"name\": \"Jill\"}";
+ auto results = controller_.trigger({minifi::test::InputFlowFileData{input, {{"uuid", TEST_UUID}}}});
+ verifyResults(results, processors::PutCouchbaseKey::Failure, ExpectedCallOptions{"mybucket", "_default", "_default", ::couchbase::persist_to::none, ::couchbase::replicate_to::none,
+ CouchbaseValueType::Json, ""}, input);
+}
+
+TEST_CASE_METHOD(PutCouchbaseKeyTestController, "FlowFile is transferred to retry relationship when temporary error is returned", "[putcouchbasekey]") {
+ proc_->setProperty(processors::PutCouchbaseKey::BucketName, "mybucket");
+ mock_couchbase_cluster_service_->setUpsertError(CouchbaseErrorType::TEMPORARY);
+ const std::string input = "{\"name\": \"John\"}\n{\"name\": \"Jill\"}";
+ auto results = controller_.trigger({minifi::test::InputFlowFileData{input, {{"uuid", TEST_UUID}}}});
+ verifyResults(results, processors::PutCouchbaseKey::Retry, ExpectedCallOptions{"mybucket", "_default", "_default", ::couchbase::persist_to::none, ::couchbase::replicate_to::none,
+ CouchbaseValueType::Json, ""}, input);
+}
+
+} // namespace org::apache::nifi::minifi::couchbase::test
diff --git a/libminifi/include/Exception.h b/libminifi/include/Exception.h
index dff108d842..77756272ff 100644
--- a/libminifi/include/Exception.h
+++ b/libminifi/include/Exception.h
@@ -47,7 +47,7 @@ enum ExceptionType {
};
static const char *ExceptionStr[MAX_EXCEPTION] = { "File Operation", "Flow File Operation", "Processor Operation", "Process Session Operation", "Process Schedule Operation", "Site2Site Protocol",
- "General Operation", "Regex Operation", "Repository Operation", "Parameter Operation"};
+ "General Operation", "Regex Operation", "Repository Operation", "Parameter Operation" };
inline const char *ExceptionTypeToString(ExceptionType type) {
if (type < MAX_EXCEPTION)
diff --git a/libminifi/include/utils/SmallString.h b/libminifi/include/utils/SmallString.h
index 2ff5dd9b60..edf9466d20 100644
--- a/libminifi/include/utils/SmallString.h
+++ b/libminifi/include/utils/SmallString.h
@@ -100,7 +100,7 @@ struct fmt::formatter> {
}
template
- auto format(const org::apache::nifi::minifi::utils::SmallString& small_string, FormatContext& ctx) {
+ auto format(const org::apache::nifi::minifi::utils::SmallString& small_string, FormatContext& ctx) const {
return string_view_formatter.format(small_string.view(), ctx);
}
};
diff --git a/thirdparty/bustache/add-append.patch b/thirdparty/bustache/add-append.patch
new file mode 100644
index 0000000000..d77ebaecab
--- /dev/null
+++ b/thirdparty/bustache/add-append.patch
@@ -0,0 +1,18 @@
+diff --git a/include/bustache/model.hpp b/include/bustache/model.hpp
+index 575969a..07bc89d 100644
+--- a/include/bustache/model.hpp
++++ b/include/bustache/model.hpp
+@@ -330,6 +330,13 @@ namespace bustache::detail
+ buf[count++] = c;
+ }
+
++ void append(const char* const begin, const char* const end)
++ {
++ for (const char* it = begin; it != end; ++it) {
++ push_back(*it);
++ }
++ }
++
+ void flush() { os(buf, count); }
+
+ std::size_t count = 0;
diff --git a/thirdparty/couchbase/remove-thirdparty.patch b/thirdparty/couchbase/remove-thirdparty.patch
new file mode 100644
index 0000000000..732576f14f
--- /dev/null
+++ b/thirdparty/couchbase/remove-thirdparty.patch
@@ -0,0 +1,172 @@
+diff --git a/cmake/ThirdPartyDependencies.cmake b/cmake/ThirdPartyDependencies.cmake
+index f02af02..f83c181 100644
+--- a/cmake/ThirdPartyDependencies.cmake
++++ b/cmake/ThirdPartyDependencies.cmake
+@@ -3,73 +3,30 @@
+ include(cmake/CPM.cmake)
+
+ # https://cmake.org/cmake/help/v3.28/policy/CMP0063.html
+ set(CMAKE_POLICY_DEFAULT_CMP0063 NEW)
+
+ function(declare_system_library target)
+ message(STATUS "Declaring system library ${target}")
+ get_target_property(target_aliased_name ${target} ALIASED_TARGET)
+ if(target_aliased_name)
+ set(target ${target_aliased_name})
+ endif()
+ set_target_properties(${target} PROPERTIES INTERFACE_SYSTEM_INCLUDE_DIRECTORIES
+ $)
+ endfunction()
+
+-if(NOT TARGET fmt::fmt)
+- # https://github.com/fmtlib/fmt/releases
+- cpmaddpackage(
+- NAME
+- fmt
+- GIT_TAG
+- 11.0.1
+- VERSION
+- 11.0.1
+- GITHUB_REPOSITORY
+- "fmtlib/fmt"
+- EXCLUDE_FROM_ALL ON
+- OPTIONS
+- "FMT_INSTALL OFF"
+- # Unicode support for MSVC enabled in CompilerWarnings.cmake
+- "FMT_UNICODE OFF"
+- "FMT_DOC OFF"
+- "BUILD_SHARED_LIBS OFF"
+- "CMAKE_C_VISIBILITY_PRESET hidden"
+- "CMAKE_CXX_VISIBILITY_PRESET hidden"
+- "CMAKE_POSITION_INDEPENDENT_CODE ON")
+-endif()
+-
+-if(NOT TARGET spdlog::spdlog)
+- # https://github.com/gabime/spdlog/releases
+- cpmaddpackage(
+- NAME
+- spdlog
+- VERSION
+- 1.14.1
+- GITHUB_REPOSITORY
+- "gabime/spdlog"
+- EXCLUDE_FROM_ALL ON
+- OPTIONS
+- "SPDLOG_INSTALL OFF"
+- "BUILD_SHARED_LIBS OFF"
+- "CMAKE_C_VISIBILITY_PRESET hidden"
+- "CMAKE_CXX_VISIBILITY_PRESET hidden"
+- "CMAKE_POSITION_INDEPENDENT_CODE ON"
+- "SPDLOG_BUILD_SHARED OFF"
+- "SPDLOG_FMT_EXTERNAL ON")
+-endif()
+-
+ if(NOT TARGET Microsoft.GSL::GSL)
+ # https://github.com/microsoft/GSL/releases
+ cpmaddpackage(
+ NAME
+ gsl
+ VERSION
+ 4.0.0
+ GITHUB_REPOSITORY
+ "microsoft/gsl"
+ EXCLUDE_FROM_ALL ON
+ OPTIONS
+ "GSL_INSTALL OFF"
+ "CMAKE_C_VISIBILITY_PRESET hidden"
+ "CMAKE_CXX_VISIBILITY_PRESET hidden"
+ "CMAKE_POSITION_INDEPENDENT_CODE ON")
+@@ -159,93 +116,24 @@ if(NOT TARGET taocpp::json)
+ "CMAKE_POSITION_INDEPENDENT_CODE ON"
+ "BUILD_SHARED_LIBS OFF"
+ "PEGTL_INSTALL ${COUCHBASE_CXX_CLIENT_INSTALL}"
+ "PEGTL_INSTALL_CMAKE_DIR ${CMAKE_INSTALL_LIBDIR}/cmake/pegtl"
+ "PEGTL_INSTALL_DOC_DIR ${CMAKE_INSTALL_DATAROOTDIR}/doc/tao/pegtl"
+ "PEGTL_BUILD_TESTS OFF"
+ "PEGTL_BUILD_EXAMPLES OFF"
+ "PEGTL_USE_BOOST_FILESYSTEM OFF"
+ "TAOCPP_JSON_INSTALL ${COUCHBASE_CXX_CLIENT_INSTALL}"
+ "TAOCPP_JSON_INSTALL_CMAKE_DIR ${CMAKE_INSTALL_LIBDIR}/cmake/taocpp-json"
+ "TAOCPP_JSON_INSTALL_DOC_DIR ${CMAKE_INSTALL_DATAROOTDIR}/doc/tao/json"
+ "TAOCPP_JSON_BUILD_TESTS OFF"
+ "TAOCPP_JSON_BUILD_EXAMPLES OFF")
+ endif()
+
+-
+-if(NOT TARGET asio::asio)
+- # https://github.com/chriskohlhoff/asio/tags
+- cpmaddpackage(
+- NAME
+- asio
+- GIT_TAG
+- asio-1-31-0
+- VERSION
+- 1.31.0
+- GITHUB_REPOSITORY
+- "chriskohlhoff/asio"
+- EXCLUDE_FROM_ALL ON)
+-endif()
+-
+-# ASIO doesn't use CMake, we have to configure it manually. Extra notes for using on Windows:
+-#
+-# 1) If _WIN32_WINNT is not set, ASIO assumes _WIN32_WINNT=0x0501, i.e. Windows XP target, which is definitely not the
+-# platform which most users target.
+-#
+-# 2) WIN32_LEAN_AND_MEAN is defined to make Winsock2 work.
+-if(asio_ADDED)
+- add_library(asio STATIC ${asio_SOURCE_DIR}/asio/src/asio.cpp ${asio_SOURCE_DIR}/asio/src/asio_ssl.cpp)
+-
+- target_include_directories(asio SYSTEM PUBLIC ${asio_SOURCE_DIR}/asio/include)
+- target_compile_definitions(asio PRIVATE ASIO_STANDALONE=1 ASIO_NO_DEPRECATED=1 ASIO_SEPARATE_COMPILATION=1)
+- target_link_libraries(asio PRIVATE Threads::Threads OpenSSL::SSL OpenSSL::Crypto)
+- set_target_properties(
+- asio
+- PROPERTIES C_VISIBILITY_PRESET hidden
+- CXX_VISIBILITY_PRESET hidden
+- POSITION_INDEPENDENT_CODE TRUE)
+-
+- if(WIN32)
+- # macro see @ https://stackoverflow.com/a/40217291/1746503
+- macro(get_win32_winnt version)
+- if(CMAKE_SYSTEM_VERSION)
+- set(ver ${CMAKE_SYSTEM_VERSION})
+- string(REGEX MATCH "^([0-9]+).([0-9])" ver ${ver})
+- string(REGEX MATCH "^([0-9]+)" verMajor ${ver})
+- # Check for Windows 10, b/c we'll need to convert to hex 'A'.
+- if("${verMajor}" MATCHES "10")
+- set(verMajor "A")
+- string(REGEX REPLACE "^([0-9]+)" ${verMajor} ver ${ver})
+- endif("${verMajor}" MATCHES "10")
+- # Remove all remaining '.' characters.
+- string(REPLACE "." "" ver ${ver})
+- # Prepend each digit with a zero.
+- string(REGEX REPLACE "([0-9A-Z])" "0\\1" ver ${ver})
+- set(${version} "0x${ver}")
+- endif()
+- endmacro()
+-
+- if(NOT DEFINED _WIN32_WINNT)
+- get_win32_winnt(ver)
+- set(_WIN32_WINNT ${ver})
+- endif()
+-
+- message(STATUS "Set _WIN32_WINNT=${_WIN32_WINNT}")
+-
+- target_compile_definitions(asio INTERFACE _WIN32_WINNT=${_WIN32_WINNT} WIN32_LEAN_AND_MEAN)
+- endif()
+-
+- add_library(asio::asio ALIAS asio)
+-endif()
+-
+ add_library(jsonsl OBJECT ${PROJECT_SOURCE_DIR}/third_party/jsonsl/jsonsl.c)
+ set_target_properties(jsonsl PROPERTIES C_VISIBILITY_PRESET hidden POSITION_INDEPENDENT_CODE TRUE)
+ target_include_directories(jsonsl SYSTEM PUBLIC ${PROJECT_SOURCE_DIR}/third_party/jsonsl)
+
+ declare_system_library(snappy)
+ declare_system_library(llhttp::llhttp)
+ declare_system_library(hdr_histogram_static)
+ declare_system_library(Microsoft.GSL::GSL)
+-declare_system_library(spdlog::spdlog)
+-declare_system_library(fmt::fmt)
+-declare_system_library(asio)
+ declare_system_library(taocpp::json)
diff --git a/win_build_vs.bat b/win_build_vs.bat
index a0058a6d74..f0699b8854 100644
--- a/win_build_vs.bat
+++ b/win_build_vs.bat
@@ -45,6 +45,7 @@ set enable_prometheus=ON
set enable_gcp=ON
set enable_elastic=ON
set enable_grafana_loki=OFF
+set enable_couchbase=OFF
set test_custom_wel_provider=OFF
set generator="Visual Studio 17 2022"
set cpack=OFF
@@ -83,6 +84,7 @@ for %%x in (%*) do (
if [%%~x] EQU [/NO_OPS] set enable_ops=OFF
if [%%~x] EQU [/NO_PYTHON_SCRIPTING] set enable_python_scripting=OFF
if [%%~x] EQU [/LOKI] set enable_grafana_loki=ON
+ if [%%~x] EQU [/COUCHBASE] set enable_couchbase=ON
if [%%~x] EQU [/32] set build_platform=Win32
if [%%~x] EQU [/D] set cmake_build_type=RelWithDebInfo
if [%%~x] EQU [/DD] set cmake_build_type=Debug
@@ -111,7 +113,7 @@ cmake -G %generator% %build_platform_cmd% -DMINIFI_INCLUDE_VC_REDIST_MERGE_MODUL
-DENABLE_OPENCV=%enable_opencv% -DENABLE_PROMETHEUS=%enable_prometheus% -DENABLE_ELASTICSEARCH=%enable_elastic% -DUSE_SHARED_LIBS=OFF -DENABLE_CONTROLLER=OFF ^
-DENABLE_BUSTACHE=%enable_bustache% -DENABLE_ENCRYPT_CONFIG=%enable_encrypt_config% -DENABLE_LUA_SCRIPTING=%enable_lua_scripting% -DENABLE_SMB=%enable_smb% ^
-DENABLE_MQTT=%enable_mqtt% -DENABLE_OPC=%enable_opc% -DENABLE_OPS=%enable_ops% ^
- -DENABLE_PYTHON_SCRIPTING=%enable_python_scripting% -DENABLE_GRAFANA_LOKI=%enable_grafana_loki% ^
+ -DENABLE_PYTHON_SCRIPTING=%enable_python_scripting% -DENABLE_GRAFANA_LOKI=%enable_grafana_loki% -DENABLE_COUCHBASE=%enable_couchbase% ^
-DBUILD_ROCKSDB=ON -DUSE_SYSTEM_UUID=OFF -DENABLE_LIBARCHIVE=ON -DENABLE_WEL=ON -DMINIFI_FAIL_ON_WARNINGS=OFF -DSKIP_TESTS=%skiptests% -DMINIFI_INCLUDE_VC_REDIST_DLLS=%vc_redist% ^
%strict_gsl_checks% -DMINIFI_INCLUDE_UCRT_DLLS=%ucrt% %sccache_arg% %EXTRA_CMAKE_ARGUMENTS% "%scriptdir%" && %buildcmd%
IF %ERRORLEVEL% NEQ 0 EXIT /b %ERRORLEVEL%