Skip to content

Commit 4dfce45

Browse files
adambenhassenGemini
authored andcommitted
feat: add redpanda buffering layer with split ingress/egress otel collectors
1 parent a606559 commit 4dfce45

File tree

11 files changed

+445
-61
lines changed

11 files changed

+445
-61
lines changed

deployment/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { deployPostgres } from './services/postgres';
2020
import { deployProxy } from './services/proxy';
2121
import { deployPublicGraphQLAPIGateway } from './services/public-graphql-api-gateway';
2222
import { deployRedis } from './services/redis';
23+
import { deployRedpanda } from './services/redpanda';
2324
import { deployS3, deployS3AuditLog, deployS3Mirror } from './services/s3';
2425
import { deploySchema } from './services/schema';
2526
import { configureSentry } from './services/sentry';
@@ -78,6 +79,7 @@ const clickhouse = deployClickhouse();
7879
const postgres = deployPostgres();
7980
const redis = deployRedis({ environment });
8081
const kafka = deployKafka();
82+
const redpanda = deployRedpanda();
8183
const s3 = deployS3();
8284
const s3Mirror = deployS3Mirror();
8385
const s3AuditLog = deployS3AuditLog();
@@ -284,6 +286,7 @@ const otelCollector = deployOTELCollector({
284286
graphql,
285287
dbMigrations,
286288
clickhouse,
289+
redpanda,
287290
image: docker.factory.getImageId('otel-collector', imagesTag),
288291
docker,
289292
});
@@ -344,5 +347,7 @@ export const schemaApiServiceId = schema.service.id;
344347
export const webhooksApiServiceId = webhooks.service.id;
345348

346349
export const appId = app.deployment.id;
347-
export const otelCollectorId = otelCollector.deployment.id;
350+
export const otelCollectorIngressId = otelCollector.ingress.deployment.id;
351+
export const otelCollectorEgressId = otelCollector.egress.deployment.id;
352+
export const redpandaStatefulSetId = redpanda.statefulSet.id;
348353
export const publicIp = proxy.get()!.status.loadBalancer.ingress[0].ip;

deployment/services/otel-collector.ts

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { DbMigrations } from './db-migrations';
55
import { Docker } from './docker';
66
import { Environment } from './environment';
77
import { GraphQL } from './graphql';
8+
import { Redpanda } from './redpanda';
89

910
export type OTELCollector = ReturnType<typeof deployOTELCollector>;
1011

@@ -15,9 +16,13 @@ export function deployOTELCollector(args: {
1516
clickhouse: Clickhouse;
1617
dbMigrations: DbMigrations;
1718
graphql: GraphQL;
19+
redpanda: Redpanda;
1820
}) {
19-
return new ServiceDeployment(
20-
'otel-collector',
21+
const kafkaBroker = args.redpanda.brokerEndpoint;
22+
23+
// Ingress: OTLP -> Redpanda
24+
const ingress = new ServiceDeployment(
25+
'otel-collector-ingress',
2126
{
2227
image: args.image,
2328
imagePullSecret: args.docker.secret,
@@ -26,6 +31,7 @@ export function deployOTELCollector(args: {
2631
HIVE_OTEL_AUTH_ENDPOINT: serviceLocalEndpoint(args.graphql.service).apply(
2732
value => value + '/otel-auth',
2833
),
34+
KAFKA_BROKER: kafkaBroker,
2935
},
3036
/**
3137
* We are using the healthcheck extension.
@@ -40,11 +46,40 @@ export function deployOTELCollector(args: {
4046
pdb: true,
4147
availabilityOnEveryNode: true,
4248
port: 4318,
43-
memoryLimit: args.environment.podsConfig.tracingCollector.memoryLimit,
49+
memoryLimit: '512Mi',
4450
autoScaling: {
4551
maxReplicas: args.environment.podsConfig.tracingCollector.maxReplicas,
4652
cpu: {
47-
limit: args.environment.podsConfig.tracingCollector.cpuLimit,
53+
limit: '500m',
54+
cpuAverageToScale: 80,
55+
},
56+
},
57+
},
58+
[args.dbMigrations],
59+
).deploy();
60+
61+
// Egress: Redpanda -> ClickHouse
62+
const egress = new ServiceDeployment(
63+
'otel-collector-egress',
64+
{
65+
image: args.image,
66+
imagePullSecret: args.docker.secret,
67+
env: {
68+
...args.environment.envVars,
69+
KAFKA_BROKER: kafkaBroker,
70+
},
71+
probePort: 13133,
72+
readinessProbe: '/',
73+
livenessProbe: '/',
74+
startupProbe: '/',
75+
exposesMetrics: true,
76+
replicas: args.environment.podsConfig.tracingCollector.maxReplicas,
77+
pdb: true,
78+
memoryLimit: '512Mi',
79+
autoScaling: {
80+
maxReplicas: args.environment.podsConfig.tracingCollector.maxReplicas,
81+
cpu: {
82+
limit: '500m',
4883
cpuAverageToScale: 80,
4984
},
5085
},
@@ -57,4 +92,12 @@ export function deployOTELCollector(args: {
5792
.withSecret('CLICKHOUSE_PASSWORD', args.clickhouse.secret, 'password')
5893
.withSecret('CLICKHOUSE_PROTOCOL', args.clickhouse.secret, 'protocol')
5994
.deploy();
95+
96+
return {
97+
ingress,
98+
egress,
99+
// For backward compatibility, expose ingress as the main deployment
100+
deployment: ingress.deployment,
101+
service: ingress.service,
102+
};
60103
}

deployment/services/redpanda.ts

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
import * as k8s from '@pulumi/kubernetes';
2+
import * as pulumi from '@pulumi/pulumi';
3+
4+
export type Redpanda = ReturnType<typeof deployRedpanda>;
5+
6+
export function deployRedpanda() {
7+
const redpandaConfig = new pulumi.Config('redpanda');
8+
const replicas = redpandaConfig.getNumber('replicas') || 3;
9+
const storageSize = redpandaConfig.get('storageSize') || '20Gi';
10+
const memoryLimit = redpandaConfig.get('memoryLimit') || '1Gi';
11+
const cpuLimit = redpandaConfig.get('cpuLimit') || '1000m';
12+
13+
const labels = { app: 'redpanda' };
14+
15+
// StatefulSet for Redpanda
16+
const statefulSet = new k8s.apps.v1.StatefulSet('redpanda', {
17+
metadata: {
18+
name: 'redpanda',
19+
},
20+
spec: {
21+
serviceName: 'redpanda',
22+
replicas,
23+
selector: {
24+
matchLabels: labels,
25+
},
26+
template: {
27+
metadata: {
28+
labels,
29+
},
30+
spec: {
31+
containers: [
32+
{
33+
name: 'redpanda',
34+
image: 'redpandadata/redpanda:v25.3.1',
35+
imagePullPolicy: 'Always',
36+
resources: {
37+
limits: {
38+
cpu: cpuLimit,
39+
memory: memoryLimit,
40+
},
41+
},
42+
args: [
43+
'redpanda',
44+
'start',
45+
'--overprovisioned',
46+
'--smp',
47+
'1',
48+
'--memory',
49+
memoryLimit,
50+
'--kafka-addr',
51+
'PLAINTEXT://0.0.0.0:9092',
52+
'--advertise-kafka-addr',
53+
pulumi.interpolate`PLAINTEXT://\${HOSTNAME}.redpanda.default.svc.cluster.local:9092`,
54+
],
55+
ports: [
56+
{ containerPort: 9092, name: 'kafka' },
57+
{ containerPort: 8082, name: 'http' },
58+
{ containerPort: 33145, name: 'rpc' },
59+
{ containerPort: 9644, name: 'admin' },
60+
],
61+
volumeMounts: [
62+
{
63+
name: 'datadir',
64+
mountPath: '/var/lib/redpanda/data',
65+
},
66+
],
67+
livenessProbe: {
68+
httpGet: {
69+
path: '/v1/status/ready',
70+
port: 9644 as any,
71+
},
72+
initialDelaySeconds: 30,
73+
periodSeconds: 10,
74+
},
75+
readinessProbe: {
76+
httpGet: {
77+
path: '/v1/status/ready',
78+
port: 9644 as any,
79+
},
80+
initialDelaySeconds: 10,
81+
periodSeconds: 5,
82+
},
83+
},
84+
],
85+
},
86+
},
87+
volumeClaimTemplates: [
88+
{
89+
metadata: {
90+
name: 'datadir',
91+
},
92+
spec: {
93+
accessModes: ['ReadWriteOnce'],
94+
resources: {
95+
requests: {
96+
storage: storageSize,
97+
},
98+
},
99+
},
100+
},
101+
],
102+
},
103+
});
104+
105+
// Headless Service for StatefulSet (used for internal cluster communication)
106+
const headlessService = new k8s.core.v1.Service('redpanda-headless', {
107+
metadata: {
108+
name: 'redpanda',
109+
},
110+
spec: {
111+
clusterIP: 'None',
112+
selector: labels,
113+
ports: [
114+
{ name: 'kafka', port: 9092, targetPort: 9092 as any },
115+
{ name: 'http', port: 8082, targetPort: 8082 as any },
116+
{ name: 'rpc', port: 33145, targetPort: 33145 as any },
117+
{ name: 'admin', port: 9644, targetPort: 9644 as any },
118+
],
119+
},
120+
});
121+
122+
// ClusterIP Service for clients (load balances across all pods)
123+
const clientService = new k8s.core.v1.Service('redpanda-client-service', {
124+
metadata: {
125+
name: 'redpanda-client',
126+
},
127+
spec: {
128+
type: 'ClusterIP',
129+
selector: labels,
130+
ports: [
131+
{ name: 'kafka', port: 9092, targetPort: 9092 as any },
132+
{ name: 'http', port: 8082, targetPort: 8082 as any },
133+
],
134+
},
135+
});
136+
137+
// Create otel-traces topic
138+
const topicCreationJob = new k8s.batch.v1.Job(
139+
'redpanda-topic-creation',
140+
{
141+
metadata: {
142+
name: 'redpanda-topic-creation',
143+
},
144+
spec: {
145+
template: {
146+
spec: {
147+
restartPolicy: 'OnFailure',
148+
containers: [
149+
{
150+
name: 'rpk',
151+
image: 'redpandadata/redpanda:v25.3.1',
152+
imagePullPolicy: 'Always',
153+
command: [
154+
'/bin/bash',
155+
'-c',
156+
`
157+
# Wait for Redpanda to be ready
158+
for i in {1..60}; do
159+
if rpk cluster health --brokers redpanda-0.redpanda:9092 2>/dev/null | grep -q 'Healthy'; then
160+
echo "Redpanda cluster is ready"
161+
break
162+
fi
163+
echo "Waiting for Redpanda cluster... ($i/60)"
164+
sleep 5
165+
done
166+
167+
# Create topic with partitioning only (no replication)
168+
rpk topic create otel-traces \\
169+
--brokers redpanda-0.redpanda:9092 \\
170+
--replicas 1 \\
171+
--partitions 10 \\
172+
--config retention.ms=2592000000 \\
173+
--config compression.type=snappy \\
174+
--config max.message.bytes=10485760 \\
175+
|| echo "Topic may already exist"
176+
177+
# Verify topic creation
178+
rpk topic describe otel-traces --brokers redpanda-0.redpanda:9092
179+
`,
180+
],
181+
},
182+
],
183+
},
184+
},
185+
},
186+
},
187+
{ dependsOn: [statefulSet, headlessService] },
188+
);
189+
190+
return {
191+
statefulSet,
192+
headlessService,
193+
clientService,
194+
topicCreationJob,
195+
// Client service endpoint - auto-discovers all brokers
196+
brokerEndpoint: 'redpanda-client:9092',
197+
};
198+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
dist:
2+
version: 0.122.0
3+
name: otelcol-custom
4+
description: Custom OTel Collector distribution
5+
output_path: ./otelcol-custom
6+
7+
receivers:
8+
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.122.0
9+
10+
processors:
11+
- gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.122.0
12+
13+
exporters:
14+
- gomod: go.opentelemetry.io/collector/exporter/debugexporter v0.122.0
15+
- gomod:
16+
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter v0.122.0
17+
18+
extensions:
19+
- gomod:
20+
github.com/open-telemetry/opentelemetry-collector-contrib/extension/healthcheckextension
21+
v0.122.0

docker/configs/otel-collector/builder-config.yaml renamed to docker/configs/otel-collector/builder-config-ingress.yaml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,14 @@ receivers:
88
- gomod: go.opentelemetry.io/collector/receiver/otlpreceiver v0.122.0
99

1010
processors:
11-
- gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.122.0
1211
- gomod: go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.122.0
1312
- gomod:
1413
github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor
1514
v0.122.0
16-
- gomod:
17-
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.122.0
1815

1916
exporters:
2017
- gomod: go.opentelemetry.io/collector/exporter/debugexporter v0.122.0
21-
- gomod:
22-
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/clickhouseexporter v0.122.0
18+
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.122.0
2319

2420
extensions:
2521
- gomod:

0 commit comments

Comments
 (0)