Skip to content

Commit 4ef7d72

Browse files
Merge branch 'main' into compute_disk_start_stop_replication
2 parents b228cd3 + 99e983e commit 4ef7d72

File tree

9 files changed

+606
-20
lines changed

9 files changed

+606
-20
lines changed

dataflow/snippets/pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,10 @@
163163
<version>${apache_beam.version}</version>
164164
</dependency>
165165
<dependency>
166-
<!-- For sending Kafka messages in the integration test -->
166+
<!-- For initializing the KafkaIO connector -->
167167
<groupId>org.apache.kafka</groupId>
168168
<artifactId>kafka-clients</artifactId>
169169
<version>3.8.0</version>
170-
<scope>test</scope>
171170
</dependency>
172171
<dependency>
173172
<!-- For running containerized Kafka instance in the integration test -->
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
// [START dataflow_kafka_read_multi_topic]
20+
import java.util.List;
21+
import org.apache.beam.sdk.Pipeline;
22+
import org.apache.beam.sdk.PipelineResult;
23+
import org.apache.beam.sdk.io.TextIO;
24+
import org.apache.beam.sdk.io.kafka.KafkaIO;
25+
import org.apache.beam.sdk.options.Description;
26+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
27+
import org.apache.beam.sdk.options.StreamingOptions;
28+
import org.apache.beam.sdk.transforms.Filter;
29+
import org.apache.beam.sdk.transforms.MapElements;
30+
import org.apache.beam.sdk.values.TypeDescriptors;
31+
import org.apache.kafka.common.serialization.LongDeserializer;
32+
import org.apache.kafka.common.serialization.StringDeserializer;
33+
import org.joda.time.Duration;
34+
import org.joda.time.Instant;
35+
36+
public class KafkaReadTopics {
37+
38+
// [END dataflow_kafka_read_multi_topic]
39+
public interface Options extends StreamingOptions {
40+
@Description("The Kafka bootstrap server. Example: localhost:9092")
41+
String getBootstrapServer();
42+
43+
void setBootstrapServer(String value);
44+
45+
@Description("The first Kafka topic to read from.")
46+
String getTopic1();
47+
48+
void setTopic1(String value);
49+
50+
@Description("The second Kafka topic to read from.")
51+
String getTopic2();
52+
53+
void setTopic2(String value);
54+
}
55+
56+
public static PipelineResult.State main(String[] args) {
57+
// Parse the pipeline options passed into the application. Example:
58+
// --bootstrap_servers=$BOOTSTRAP_SERVERS --topic=$KAFKA_TOPIC --outputPath=$OUTPUT_FILE
59+
// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
60+
var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
61+
options.setStreaming(true);
62+
63+
Pipeline pipeline = createPipeline(options);
64+
return pipeline.run().waitUntilFinish();
65+
}
66+
67+
// [START dataflow_kafka_read_multi_topic]
68+
public static Pipeline createPipeline(Options options) {
69+
String topic1 = options.getTopic1();
70+
String topic2 = options.getTopic2();
71+
72+
// Build the pipeline.
73+
var pipeline = Pipeline.create(options);
74+
var allTopics = pipeline
75+
.apply(KafkaIO.<Long, String>read()
76+
.withTopics(List.of(topic1, topic2))
77+
.withBootstrapServers(options.getBootstrapServer())
78+
.withKeyDeserializer(LongDeserializer.class)
79+
.withValueDeserializer(StringDeserializer.class)
80+
.withMaxReadTime(Duration.standardSeconds(10))
81+
.withStartReadTime(Instant.EPOCH)
82+
);
83+
84+
// Create separate pipeline branches for each topic.
85+
// The first branch filters on topic1.
86+
allTopics
87+
.apply(Filter.by(record -> record.getTopic().equals(topic1)))
88+
.apply(MapElements
89+
.into(TypeDescriptors.strings())
90+
.via(record -> record.getKV().getValue()))
91+
.apply(TextIO.write()
92+
.to(topic1)
93+
.withSuffix(".txt")
94+
.withNumShards(1)
95+
);
96+
97+
// The second branch filters on topic2.
98+
allTopics
99+
.apply(Filter.by(record -> record.getTopic().equals(topic2)))
100+
.apply(MapElements
101+
.into(TypeDescriptors.strings())
102+
.via(record -> record.getKV().getValue()))
103+
.apply(TextIO.write()
104+
.to(topic2)
105+
.withSuffix(".txt")
106+
.withNumShards(1)
107+
);
108+
return pipeline;
109+
}
110+
}
111+
// [END dataflow_kafka_read_multi_topic]

dataflow/snippets/src/test/java/com/example/dataflow/KafkaReadIT.java

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,13 @@
3939
import org.testcontainers.utility.DockerImageName;
4040

4141
public class KafkaReadIT {
42-
private static final String TOPIC_NAME = "topic-" + UUID.randomUUID();
42+
private static final String[] TOPIC_NAMES = {
43+
"topic-" + UUID.randomUUID(),
44+
"topic-" + UUID.randomUUID()
45+
};
4346

44-
private static final String OUTPUT_FILE_NAME_PREFIX = UUID.randomUUID().toString();
45-
private static final String OUTPUT_FILE_NAME = OUTPUT_FILE_NAME_PREFIX + "-00000-of-00001.txt";
47+
// The TextIO connector appends this suffix to the pipeline output file.
48+
private static final String OUTPUT_FILE_SUFFIX = "-00000-of-00001.txt";
4649

4750
private static KafkaContainer kafka;
4851
private static String bootstrapServer;
@@ -54,40 +57,61 @@ public void setUp() throws ExecutionException, InterruptedException {
5457
kafka.start();
5558
bootstrapServer = kafka.getBootstrapServers();
5659

57-
// Create a topic.
60+
// Create topics.
5861
Properties properties = new Properties();
5962
properties.put("bootstrap.servers", bootstrapServer);
6063
AdminClient adminClient = AdminClient.create(properties);
61-
var topic = new NewTopic(TOPIC_NAME, 1, (short) 1);
62-
adminClient.createTopics(Arrays.asList(topic));
64+
for (String topicName : TOPIC_NAMES) {
65+
var topic = new NewTopic(topicName, 1, (short) 1);
66+
adminClient.createTopics(Arrays.asList(topic));
67+
}
6368

64-
// Send a message to the topic.
65-
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
69+
// Send messages to the topics.
70+
properties.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer");
6671
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
67-
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
68-
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-0", "event-0");
69-
Future future = producer.send(record);
70-
future.get();
72+
KafkaProducer<Long, String> producer = new KafkaProducer<>(properties);
73+
for (String topicName : TOPIC_NAMES) {
74+
var record = new ProducerRecord<>(topicName, 0L, topicName + "-event-0");
75+
Future future = producer.send(record);
76+
future.get();
77+
}
7178
}
7279

7380
@After
7481
public void tearDown() throws IOException {
7582
kafka.stop();
76-
Files.deleteIfExists(Paths.get(OUTPUT_FILE_NAME));
83+
for (String topicName : TOPIC_NAMES) {
84+
Files.deleteIfExists(Paths.get(topicName + OUTPUT_FILE_SUFFIX));
85+
}
7786
}
7887

7988
@Test
8089
public void testApacheKafkaRead() throws IOException {
8190
PipelineResult.State state = KafkaRead.main(new String[] {
8291
"--runner=DirectRunner",
8392
"--bootstrapServer=" + bootstrapServer,
84-
"--topic=" + TOPIC_NAME,
85-
"--outputPath=" + OUTPUT_FILE_NAME_PREFIX
93+
"--topic=" + TOPIC_NAMES[0],
94+
"--outputPath=" + TOPIC_NAMES[0] // Use the topic name as the output file name.
8695
});
8796
assertEquals(PipelineResult.State.DONE, state);
97+
verifyOutput(TOPIC_NAMES[0]);
98+
}
99+
100+
@Test
101+
public void testApacheKafkaReadTopics() throws IOException {
102+
PipelineResult.State state = KafkaReadTopics.main(new String[] {
103+
"--runner=DirectRunner",
104+
"--bootstrapServer=" + bootstrapServer,
105+
"--topic1=" + TOPIC_NAMES[0],
106+
"--topic2=" + TOPIC_NAMES[1]
107+
});
108+
assertEquals(PipelineResult.State.DONE, state);
109+
verifyOutput(TOPIC_NAMES[0]);
110+
verifyOutput(TOPIC_NAMES[1]);
111+
}
88112

89-
// Verify the pipeline wrote the output.
90-
String output = Files.readString(Paths.get(OUTPUT_FILE_NAME));
91-
assertTrue(output.contains("event-0"));
113+
private void verifyOutput(String topic) throws IOException {
114+
String output = Files.readString(Paths.get(topic + OUTPUT_FILE_SUFFIX));
115+
assertTrue(output.contains(topic + "-event-0"));
92116
}
93117
}

security-command-center/snippets/pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,18 @@
4545
<version>2.45.0</version>
4646
</dependency>
4747

48+
<dependency>
49+
<groupId>com.google.cloud</groupId>
50+
<artifactId>google-cloud-securitycentermanagement</artifactId>
51+
<version>0.20.0</version>
52+
</dependency>
53+
54+
<dependency>
55+
<groupId>com.google.api.grpc</groupId>
56+
<artifactId>proto-google-cloud-securitycentermanagement-v1</artifactId>
57+
<version>0.20.0</version>
58+
</dependency>
59+
4860
<dependency>
4961
<groupId>com.google.cloud</groupId>
5062
<artifactId>google-cloud-pubsub</artifactId>
@@ -80,4 +92,13 @@
8092
<scope>test</scope>
8193
</dependency>
8294
</dependencies>
95+
<build>
96+
<plugins>
97+
<plugin>
98+
<groupId>org.apache.maven.plugins</groupId>
99+
<artifactId>maven-surefire-plugin</artifactId>
100+
<version>3.2.5</version>
101+
</plugin>
102+
</plugins>
103+
</build>
83104
</project>
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package management.api;
18+
19+
// [START securitycenter_create_security_health_analytics_custom_module]
20+
import com.google.cloud.securitycentermanagement.v1.CreateSecurityHealthAnalyticsCustomModuleRequest;
21+
import com.google.cloud.securitycentermanagement.v1.CustomConfig;
22+
import com.google.cloud.securitycentermanagement.v1.CustomConfig.ResourceSelector;
23+
import com.google.cloud.securitycentermanagement.v1.CustomConfig.Severity;
24+
import com.google.cloud.securitycentermanagement.v1.SecurityCenterManagementClient;
25+
import com.google.cloud.securitycentermanagement.v1.SecurityHealthAnalyticsCustomModule;
26+
import com.google.cloud.securitycentermanagement.v1.SecurityHealthAnalyticsCustomModule.EnablementState;
27+
import com.google.type.Expr;
28+
import java.io.IOException;
29+
30+
public class CreateSecurityHealthAnalyticsCustomModule {
31+
32+
public static void main(String[] args) throws IOException {
33+
// https://cloud.google.com/security-command-center/docs/reference/security-center-management/rest/v1/organizations.locations.securityHealthAnalyticsCustomModules/create
34+
// replace "project_id" with a real project ID
35+
String parent = String.format("projects/%s/locations/%s", "project_id", "global");
36+
37+
String customModuleDisplayName = "custom_module_display_name";
38+
39+
createSecurityHealthAnalyticsCustomModule(parent, customModuleDisplayName);
40+
}
41+
42+
public static SecurityHealthAnalyticsCustomModule createSecurityHealthAnalyticsCustomModule(
43+
String parent, String customModuleDisplayName) throws IOException {
44+
45+
// Initialize client that will be used to send requests. This client only needs
46+
// to be created
47+
// once, and can be reused for multiple requests.
48+
try (SecurityCenterManagementClient client = SecurityCenterManagementClient.create()) {
49+
50+
String name =
51+
String.format("%s/securityHealthAnalyticsCustomModules/%s", parent, "custom_module");
52+
53+
// define the CEL expression here, change it according to the your requirements
54+
Expr expr =
55+
Expr.newBuilder()
56+
.setExpression(
57+
"has(resource.rotationPeriod) && (resource.rotationPeriod > "
58+
+ "duration('2592000s'))")
59+
.build();
60+
61+
// define the resource selector
62+
ResourceSelector resourceSelector =
63+
ResourceSelector.newBuilder()
64+
.addResourceTypes("cloudkms.googleapis.com/CryptoKey")
65+
.build();
66+
67+
// define the custom module configuration, update the severity, description,
68+
// recommendation below
69+
CustomConfig customConfig =
70+
CustomConfig.newBuilder()
71+
.setPredicate(expr)
72+
.setResourceSelector(resourceSelector)
73+
.setSeverity(Severity.MEDIUM)
74+
.setDescription("add your description here")
75+
.setRecommendation("add your recommendation here")
76+
.build();
77+
78+
// define the security health analytics custom module configuration, update the
79+
// EnablementState below
80+
SecurityHealthAnalyticsCustomModule securityHealthAnalyticsCustomModule =
81+
SecurityHealthAnalyticsCustomModule.newBuilder()
82+
.setName(name)
83+
.setDisplayName(customModuleDisplayName)
84+
.setEnablementState(EnablementState.ENABLED)
85+
.setCustomConfig(customConfig)
86+
.build();
87+
88+
CreateSecurityHealthAnalyticsCustomModuleRequest request =
89+
CreateSecurityHealthAnalyticsCustomModuleRequest.newBuilder()
90+
.setParent(parent)
91+
.setSecurityHealthAnalyticsCustomModule(securityHealthAnalyticsCustomModule)
92+
.build();
93+
94+
SecurityHealthAnalyticsCustomModule response =
95+
client.createSecurityHealthAnalyticsCustomModule(request);
96+
97+
return response;
98+
}
99+
}
100+
}
101+
// [END securitycenter_create_security_health_analytics_custom_module]

0 commit comments

Comments
 (0)