diff --git a/docs/modules/ROOT/examples/jet b/docs/modules/ROOT/examples/jet new file mode 120000 index 000000000..5740c6ec6 --- /dev/null +++ b/docs/modules/ROOT/examples/jet @@ -0,0 +1 @@ +../../../../jet \ No newline at end of file diff --git a/jet/operator-jet-job-snapshot/hz.yaml b/jet/operator-jet-job-snapshot/hz.yaml new file mode 100644 index 000000000..a2dc85990 --- /dev/null +++ b/jet/operator-jet-job-snapshot/hz.yaml @@ -0,0 +1,10 @@ +apiVersion: hazelcast.com/v1alpha1 +kind: Hazelcast +metadata: + name: hazelcast +spec: + clusterSize: 3 + licenseKeySecretName: hazelcast-license-key + jet: + enabled: true + resourceUploadEnabled: true diff --git a/jet/operator-jet-job-snapshot/jet-pipelines/pom.xml b/jet/operator-jet-job-snapshot/jet-pipelines/pom.xml new file mode 100644 index 000000000..605a2790c --- /dev/null +++ b/jet/operator-jet-job-snapshot/jet-pipelines/pom.xml @@ -0,0 +1,39 @@ + + 4.0.0 + + jet-pipelines + + + jet + com.hazelcast.samples.jet + 0.1-SNAPSHOT + ../../pom.xml + + + jet-pipelines + + + + com.hazelcast + hazelcast + ${hazelcast.version} + + + org.junit.jupiter + junit-jupiter-api + test + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + + + diff --git a/jet/operator-jet-job-snapshot/jet-pipelines/src/main/java/org/examples/jet/snapshot/JobV1.java b/jet/operator-jet-job-snapshot/jet-pipelines/src/main/java/org/examples/jet/snapshot/JobV1.java new file mode 100644 index 000000000..9c1314f14 --- /dev/null +++ b/jet/operator-jet-job-snapshot/jet-pipelines/src/main/java/org/examples/jet/snapshot/JobV1.java @@ -0,0 +1,32 @@ +package org.examples.jet.snapshot; + +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.jet.pipeline.JournalInitialPosition; +import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.Sinks; +import com.hazelcast.jet.pipeline.Sources; + +public class JobV1 { + + public static void main(String[] args) { + var p = Pipeline.create(); + var transactionSource = Sources.mapJournal("transaction", JournalInitialPosition.START_FROM_OLDEST); + var loggerSink = Sinks.logger(); + p.readFrom(transactionSource) + .withIngestionTimestamps() + .setName("Emit Transactions") + .map(e -> { + System.out.printf("The transaction '%s' is being executed in 'job-v1'\n", e.getKey()); + // execute the transaction + return String.format("[Job V1] transaction:'%s' payload:'%s'", e.getKey(), e.getValue()); + }) + .setName("Apply Transactions") + .writeTo(loggerSink) + .setName("Log Transactions"); + + HazelcastInstance hz = Hazelcast.bootstrappedInstance(); + hz.getJet().newJob(p); + } + +} diff --git a/jet/operator-jet-job-snapshot/jet-pipelines/src/main/java/org/examples/jet/snapshot/JobV2.java b/jet/operator-jet-job-snapshot/jet-pipelines/src/main/java/org/examples/jet/snapshot/JobV2.java new file mode 100644 index 000000000..6a4e69231 --- /dev/null +++ b/jet/operator-jet-job-snapshot/jet-pipelines/src/main/java/org/examples/jet/snapshot/JobV2.java @@ -0,0 +1,32 @@ +package org.examples.jet.snapshot; + +import com.hazelcast.core.Hazelcast; +import com.hazelcast.core.HazelcastInstance; +import com.hazelcast.jet.pipeline.JournalInitialPosition; +import com.hazelcast.jet.pipeline.Pipeline; +import com.hazelcast.jet.pipeline.Sinks; +import com.hazelcast.jet.pipeline.Sources; + +public class JobV2 { + + public static void main(String[] args) { + var p = Pipeline.create(); + var transactionSource = Sources.mapJournal("transaction", JournalInitialPosition.START_FROM_OLDEST); + var loggerSink = Sinks.logger(); + p.readFrom(transactionSource) + .withIngestionTimestamps() + .setName("Emit Transactions") + .map(e -> { + System.out.printf("The transaction '%s' is being executed in 'job-v2'\n", e.getKey()); + // execute the transaction + return String.format("[Job V2] transaction:'%s' payload:'%s'", e.getKey(), e.getValue()); + }) + .setName("Apply Transactions") + .writeTo(loggerSink) + .setName("Log Transactions"); + + HazelcastInstance hz = Hazelcast.bootstrappedInstance(); + hz.getJet().newJob(p); + } + +} diff --git a/jet/operator-jet-job-snapshot/job-v1.yaml b/jet/operator-jet-job-snapshot/job-v1.yaml new file mode 100644 index 000000000..d966aac1c --- /dev/null +++ b/jet/operator-jet-job-snapshot/job-v1.yaml @@ -0,0 +1,12 @@ +apiVersion: hazelcast.com/v1alpha1 +kind: JetJob +metadata: + name: transaction-v1 +spec: + hazelcastResourceName: hazelcast + state: Running + jarName: jet-pipelines-1.0-SNAPSHOT.jar + mainClass: org.examples.jet.snapshot.JobV1 + bucketConfig: + bucketURI: '' + secretName: '' diff --git a/jet/operator-jet-job-snapshot/job-v2.yaml b/jet/operator-jet-job-snapshot/job-v2.yaml new file mode 100644 index 000000000..c1b01ee65 --- /dev/null +++ b/jet/operator-jet-job-snapshot/job-v2.yaml @@ -0,0 +1,13 @@ +apiVersion: hazelcast.com/v1alpha1 +kind: JetJob +metadata: + name: transaction-v2 +spec: + hazelcastResourceName: hazelcast + state: Running + jarName: jet-pipelines-1.0-SNAPSHOT.jar + mainClass: org.examples.jet.snapshot.JobV2 + initialSnapshotResourceName: snapshot-transaction + bucketConfig: + bucketURI: '' + secretName: '' diff --git a/jet/operator-jet-job-snapshot/map.yaml b/jet/operator-jet-job-snapshot/map.yaml new file mode 100644 index 000000000..5b3474cfd --- /dev/null +++ b/jet/operator-jet-job-snapshot/map.yaml @@ -0,0 +1,8 @@ +apiVersion: hazelcast.com/v1alpha1 +kind: Map +metadata: + name: transaction +spec: + hazelcastResourceName: hazelcast + eventJournal: + capacity: 9000 diff --git a/jet/operator-jet-job-snapshot/snapshot.yaml b/jet/operator-jet-job-snapshot/snapshot.yaml new file mode 100644 index 000000000..666a9e2ad --- /dev/null +++ b/jet/operator-jet-job-snapshot/snapshot.yaml @@ -0,0 +1,7 @@ +apiVersion: hazelcast.com/v1alpha1 +kind: JetJobSnapshot +metadata: + name: snapshot-transaction +spec: + jetJobResourceName: transaction-v1 + cancelJob: true diff --git a/jet/pom.xml b/jet/pom.xml index 6a17df10d..feac29a68 100644 --- a/jet/pom.xml +++ b/jet/pom.xml @@ -67,6 +67,7 @@ wordcount-compute-isolation cdc mongodb + operator-jet-job-snapshot/jet-pipelines