Skip to content

Commit 411bbaa

Browse files
committed
add kafka
1 parent 0a80263 commit 411bbaa

File tree

15 files changed

+715
-281
lines changed

15 files changed

+715
-281
lines changed

events/Kafka Event Schema.md

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# Release Sync Kafka Event Schema Documentation
2+
3+
## Overview
4+
5+
This schema defines a unified event contract for the PDVD system. It combines software release metadata, storage references for SBOMs, and deployment endpoint information into a single message. This structure ensures that events processed via Kafka trigger the same validation, deduplication, and automated CVE linking logic as the REST API.
6+
7+
---
8+
9+
## Top-Level Properties
10+
11+
* **event_type** (string, Required): Unique identifier for the event logic (e.g., `release.sync.created`).
12+
* **event_id** (string, Required): A unique UUID for message tracking and deduplication.
13+
* **event_time** (string, Required): The ISO 8601 timestamp when the event was generated.
14+
* **synced_at** (string, Optional): The ISO 8601 timestamp of the actual deployment. If omitted, the system defaults to the current processing time.
15+
16+
---
17+
18+
## 1. Release Object
19+
20+
Contains core metadata for the software component.
21+
22+
* **name** (string, Required): The full name of the release (e.g., "org/repo").
23+
* **version** (string, Required): The version string, automatically cleaned and parsed into SemVer components during ingestion.
24+
* **projecttype** (string, Optional): The category of project (e.g., `docker`, `container`, `git`).
25+
* **gitcommit** (string, Optional): The Git SHA associated with the release.
26+
* **dockersha** (string, Optional): The Docker Image Digest.
27+
* **is_public** (boolean, Default: `true`): Visibility flag for the release.
28+
29+
---
30+
31+
## 2. SBOM Reference Object
32+
33+
Describes how to retrieve the SBOM content for security analysis.
34+
35+
* **cid** (string, Required): The IPFS Content Identifier where the JSON SBOM is stored.
36+
* **storage_type** (string, Required): The backend storage provider. Allowed values: `["ipfs", "s3"]`.
37+
* **content_sha** (string, Optional): A SHA256 hash of the SBOM content for integrity verification.
38+
* **uploaded_at** (string, Optional): Timestamp when the SBOM was persisted to the storage backend.
39+
40+
---
41+
42+
## 3. Endpoint Object
43+
44+
Defines the deployment target for MTTR and lifecycle tracking.
45+
46+
* **name** (string, Required): Unique name of the environment or cluster (e.g., "prod-us-east-1").
47+
* **endpoint_type** (string, Required): The infrastructure category. Supported types include `eks`, `lambda`, `gke`, `fargate`, `iot`, and `mission_asset`.
48+
* **environment** (string, Required): The environment designation (e.g., `production`, `staging`).
49+
* **is_public** (boolean, Default: `true`): Visibility flag for the endpoint.
50+
51+
---
52+
53+
## Example JSON Payload
54+
55+
```json
56+
{
57+
"event_type": "release.sync.created",
58+
"event_id": "550e8400-e29b-41d4-a716-446655440000",
59+
"event_time": "2023-10-27T10:00:00Z",
60+
"synced_at": "2023-10-27T09:55:00Z",
61+
"release": {
62+
"name": "ortelius/reporting-service",
63+
"version": "v1.2.3",
64+
"projecttype": "docker",
65+
"gitcommit": "af32c1b",
66+
"dockersha": "sha256:45b34006...77a",
67+
"is_public": true
68+
},
69+
"sbom_ref": {
70+
"cid": "QmXoypizjW3WknFiJnKLwHCnL72vedxjQkDDP1mXWo6uco",
71+
"storage_type": "ipfs",
72+
"content_sha": "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
73+
"uploaded_at": "2023-10-27T09:50:00Z"
74+
},
75+
"endpoint": {
76+
"name": "production-cluster-01",
77+
"endpoint_type": "eks",
78+
"environment": "production",
79+
"is_public": true
80+
}
81+
}
82+
```

events/kafka.json

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
{
2+
"$schema": "http://json-schema.org/draft-07/schema#",
3+
"type": "object",
4+
"properties": {
5+
"event_type": {
6+
"type": "string",
7+
"description": "Type of event, e.g., 'release.sync.created'"
8+
},
9+
"event_id": {
10+
"type": "string",
11+
"format": "uuid"
12+
},
13+
"event_time": {
14+
"type": "string",
15+
"format": "date-time"
16+
},
17+
"release": {
18+
"type": "object",
19+
"description": "The ProjectRelease metadata",
20+
"properties": {
21+
"name": { "type": "string" },
22+
"version": { "type": "string" },
23+
"projecttype": { "type": "string" },
24+
"gitcommit": { "type": "string" },
25+
"dockersha": { "type": "string" },
26+
"is_public": { "type": "boolean", "default": true }
27+
},
28+
"required": ["name", "version"]
29+
},
30+
"sbom_ref": {
31+
"type": "object",
32+
"description": "Reference to the stored SBOM content",
33+
"properties": {
34+
"cid": { "type": "string", "description": "IPFS CID of the SBOM content" },
35+
"storage_type": { "type": "string", "enum": ["ipfs", "s3"] },
36+
"content_sha": { "type": "string" },
37+
"uploaded_at": { "type": "string", "format": "date-time" }
38+
},
39+
"required": ["cid", "storage_type"]
40+
},
41+
"endpoint": {
42+
"type": "object",
43+
"description": "The deployment target (Endpoint) metadata",
44+
"properties": {
45+
"name": { "type": "string" },
46+
"endpoint_type": {
47+
"type": "string",
48+
"enum": ["cluster", "ec2", "lambda", "ecs", "eks", "gke", "aks", "fargate", "edge", "iot", "mission_asset"]
49+
},
50+
"environment": { "type": "string" },
51+
"is_public": { "type": "boolean", "default": true }
52+
},
53+
"required": ["name", "endpoint_type", "environment"]
54+
},
55+
"synced_at": {
56+
"type": "string",
57+
"format": "date-time",
58+
"description": "Timestamp of the deployment sync"
59+
}
60+
},
61+
"required": [
62+
"event_type",
63+
"event_id",
64+
"event_time",
65+
"release",
66+
"sbom_ref",
67+
"endpoint"
68+
]
69+
}

events/modules/releases/handler.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package release
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"log"
8+
9+
"github.com/ortelius/pdvd-backend/v12/model"
10+
)
11+
12+
type SBOMFetcher interface {
13+
FetchSBOM(ctx context.Context, cid string) ([]byte, error)
14+
}
15+
16+
type ReleaseService interface {
17+
CreateRelease(ctx context.Context, release model.ReleaseWithSBOM) error
18+
}
19+
20+
func HandleReleaseSBOMCreatedWithService(
21+
ctx context.Context,
22+
msg []byte,
23+
fetcher SBOMFetcher,
24+
service ReleaseService,
25+
) error {
26+
var event ReleaseSBOMCreatedEvent
27+
if err := json.Unmarshal(msg, &event); err != nil {
28+
return fmt.Errorf("failed to unmarshal ReleaseSBOMCreatedEvent: %w", err)
29+
}
30+
31+
if event.Release.Name == "" || event.Release.Version == "" || event.SBOMRef.CID == "" {
32+
return fmt.Errorf("invalid event: missing required fields")
33+
}
34+
35+
log.Printf("Processing release %s@%s (CID=%s)", event.Release.Name, event.Release.Version, event.SBOMRef.CID)
36+
37+
sbomData, err := fetcher.FetchSBOM(ctx, event.SBOMRef.CID)
38+
if err != nil {
39+
return fmt.Errorf("failed to fetch SBOM for CID %s: %w", event.SBOMRef.CID, err)
40+
}
41+
42+
releaseWithSBOM := model.ReleaseWithSBOM{
43+
ProjectRelease: model.ProjectRelease{
44+
Name: event.Release.Name,
45+
Version: event.Release.Version,
46+
GitCommit: event.Release.GitCommit,
47+
DockerSha: event.Release.DockerSha,
48+
ProjectType: event.Release.ProjectType,
49+
IsPublic: event.Release.IsPublic,
50+
},
51+
SBOM: model.SBOM{
52+
Content: sbomData,
53+
},
54+
}
55+
56+
releaseWithSBOM.ParseAndSetNameComponents()
57+
releaseWithSBOM.ParseAndSetVersion()
58+
59+
if err := service.CreateRelease(ctx, releaseWithSBOM); err != nil {
60+
return fmt.Errorf("internal service error: %w", err)
61+
}
62+
63+
log.Printf("Successfully processed release %s@%s", event.Release.Name, event.Release.Version)
64+
return nil
65+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package release
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
"github.com/ortelius/pdvd-backend/v12/model"
10+
"github.com/segmentio/kafka-go"
11+
)
12+
13+
// ReleaseProducer handles sending SBOM creation events to Kafka
14+
type ReleaseProducer struct {
15+
Writer *kafka.Writer
16+
}
17+
18+
// NewReleaseProducer initializes a new Kafka writer for release events
19+
func NewReleaseProducer(brokers []string, topic string) *ReleaseProducer {
20+
return &ReleaseProducer{
21+
Writer: &kafka.Writer{
22+
Addr: kafka.TCP(brokers...),
23+
Topic: topic,
24+
Balancer: &kafka.LeastBytes{},
25+
},
26+
}
27+
}
28+
29+
// PublishReleaseSBOMCreated sends the event to the Kafka topic
30+
func (p *ReleaseProducer) PublishReleaseSBOMCreated(ctx context.Context, release model.ProjectRelease, cid string) error {
31+
32+
// Construct the Event Contract
33+
event := ReleaseSBOMCreatedEvent{
34+
EventType: "release.sbom.created",
35+
EventID: uuid.New().String(),
36+
EventTime: time.Now().UTC(),
37+
SchemaVersion: "v1",
38+
Release: release,
39+
SBOMRef: SBOMReference{
40+
CID: cid,
41+
StorageType: "ipfs", // Default storage type for the system
42+
UploadedAt: time.Now().UTC(),
43+
},
44+
}
45+
46+
// Marshal to JSON
47+
payload, err := json.Marshal(event)
48+
if err != nil {
49+
return err
50+
}
51+
52+
// Write to Kafka
53+
return p.Writer.WriteMessages(ctx, kafka.Message{
54+
Key: []byte(release.Name),
55+
Value: payload,
56+
})
57+
}
58+
59+
// Close cleans up the Kafka writer
60+
func (p *ReleaseProducer) Close() error {
61+
return p.Writer.Close()
62+
}

events/modules/releases/types.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package release
2+
3+
import (
4+
"time"
5+
6+
"github.com/ortelius/pdvd-backend/v12/model"
7+
)
8+
9+
type ReleaseSBOMCreatedEvent struct {
10+
EventType string `json:"event_type"`
11+
EventID string `json:"event_id"`
12+
EventTime time.Time `json:"event_time"`
13+
SchemaVersion string `json:"schema_version"`
14+
15+
Release model.ProjectRelease `json:"release"`
16+
17+
SBOMRef SBOMReference `json:"sbom_ref"`
18+
}
19+
20+
// SBOMReference describes where an SBOM is stored and how it can be retrieved.
21+
type SBOMReference struct {
22+
// IPFS CID returned by the SBOM ingestion Lambda
23+
CID string `json:"cid"`
24+
25+
// Storage backend identifier (e.g. "ipfs+s3", "ipfs")
26+
StorageType string `json:"storage_type"`
27+
28+
// Optional S3 details
29+
Bucket string `json:"bucket,omitempty"`
30+
ObjectKey string `json:"object_key,omitempty"`
31+
32+
// Optional integrity metadata
33+
ContentSha string `json:"content_sha,omitempty"`
34+
SizeBytes int64 `json:"size_bytes,omitempty"`
35+
36+
// Timestamp when the SBOM was persisted
37+
UploadedAt time.Time `json:"uploaded_at"`
38+
}

go.mod

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ require (
99
github.com/arangodb/go-driver/v2 v2.1.6
1010
github.com/cenkalti/backoff v2.2.1+incompatible
1111
github.com/go-git/go-git/v5 v5.16.4
12-
github.com/gofiber/fiber/v2 v2.52.10
13-
github.com/golang-jwt/jwt/v5 v5.3.0
12+
github.com/gofiber/fiber/v2 v2.52.11
13+
github.com/golang-jwt/jwt/v5 v5.3.1
1414
github.com/google/osv-scanner v1.9.2
15+
github.com/google/uuid v1.6.0
1516
github.com/graphql-go/graphql v0.8.1
1617
github.com/package-url/packageurl-go v0.1.3
1718
github.com/pandatix/go-cvss v0.6.2
19+
github.com/segmentio/kafka-go v0.4.50
1820
go.uber.org/zap v1.27.1
1921
golang.org/x/crypto v0.47.0
2022
gopkg.in/yaml.v2 v2.4.0
@@ -28,8 +30,8 @@ require (
2830
github.com/aquasecurity/go-version v0.0.1 // indirect
2931
github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e // indirect
3032
github.com/clipperhouse/stringish v0.1.1 // indirect
31-
github.com/clipperhouse/uax29/v2 v2.3.0 // indirect
32-
github.com/cloudflare/circl v1.6.2 // indirect
33+
github.com/clipperhouse/uax29/v2 v2.5.0 // indirect
34+
github.com/cloudflare/circl v1.6.3 // indirect
3335
github.com/cyphar/filepath-securejoin v0.6.1 // indirect
3436
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
3537
github.com/dchest/siphash v1.2.3 // indirect
@@ -38,15 +40,15 @@ require (
3840
github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect
3941
github.com/go-git/go-billy/v5 v5.7.0 // indirect
4042
github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
41-
github.com/google/uuid v1.6.0 // indirect
4243
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect
4344
github.com/kevinburke/ssh_config v1.4.0 // indirect
4445
github.com/kkdai/maglev v0.2.0 // indirect
45-
github.com/klauspost/compress v1.18.2 // indirect
46+
github.com/klauspost/compress v1.18.3 // indirect
4647
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
4748
github.com/mattn/go-colorable v0.1.14 // indirect
4849
github.com/mattn/go-isatty v0.0.20 // indirect
4950
github.com/mattn/go-runewidth v0.0.19 // indirect
51+
github.com/pierrec/lz4/v4 v4.1.25 // indirect
5052
github.com/pjbgf/sha1cd v0.5.0 // indirect
5153
github.com/pkg/errors v0.9.1 // indirect
5254
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect

0 commit comments

Comments
 (0)