Skip to content

Commit a4d1ad0

Browse files
authored
feat: reimplement the exporter in Go (google#4197)
More database migration! Reimplement the exporter in Go, reading from the GCS proto files instead of from the datastore Bugs. I've made the whole thing more parallel and all in-memory, which should be a pretty decent performance improvement. Testing is currently missing - we need a way to mock the GCS buckets.
1 parent 1d9b902 commit a4d1ad0

File tree

13 files changed

+669
-21
lines changed

13 files changed

+669
-21
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ consists of:
4545
| `gcp/functions` | The Cloud Function for publishing PyPI vulnerabilities (maintained, but not developed) |
4646
| `gcp/indexer` | The determine version `indexer` |
4747
| `gcp/website` | The backend of the osv.dev web interface, with the frontend in `frontend3` <br /> Blog posts (in `blog`) |
48-
| `gcp/workers/` | Workers for bisection and impact analysis (`worker`, `importer`, `exporter`, `alias`) <br /> `cron/` jobs for database backups and processing oss-fuzz records |
48+
| `gcp/workers/` | Workers for bisection and impact analysis (`worker`, `importer`, `alias`) <br /> `cron/` jobs for database backups and processing oss-fuzz records |
49+
| `go/` | Go module for shared libraries and commands (`cmd/exporter`, `cmd/recordchecker`) |
4950
| `osv/` | The core OSV Python library, used in basically all Python services <br /> OSV ecosystem package versioning helpers in `ecosystems/` <br /> Datastore model definitions in `models.py` |
5051
| `tools/` | Misc scripts/tools, mostly intended for development (datastore stuff, linting) <br /> The `indexer-api-caller` for indexer calling |
5152
| `vulnfeeds/` | Go module for (mostly) the NVD CVE conversion <br /> The Alpine feed converter (`cmd/alpine`) <br /> The Debian feed converter (`tools/debian`, which is written in Python) |

deployment/build-and-stage.yaml

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ steps:
6262
args: ['push', '--all-tags', 'gcr.io/oss-vdb/worker-base']
6363
waitFor: ['build-worker-base', 'cloud-build-queue']
6464

65-
# Build/push core worker/importer/exporter/alias images.
65+
# Build/push core worker/importer/alias images.
6666
- name: gcr.io/cloud-builders/docker
6767
args: ['build', '-t', 'gcr.io/oss-vdb/worker:latest', '-t', 'gcr.io/oss-vdb/worker:$COMMIT_SHA', '-f', 'gcp/workers/worker/Dockerfile', '.']
6868
id: 'build-worker'
@@ -80,15 +80,6 @@ steps:
8080
args: ['push', '--all-tags', 'gcr.io/oss-vdb/importer']
8181
waitFor: ['build-importer', 'cloud-build-queue']
8282

83-
- name: gcr.io/cloud-builders/docker
84-
args: ['build', '-t', 'gcr.io/oss-vdb/exporter:latest', '-t', 'gcr.io/oss-vdb/exporter:$COMMIT_SHA', '.']
85-
dir: 'gcp/workers/exporter'
86-
id: 'build-exporter'
87-
waitFor: ['build-worker']
88-
- name: gcr.io/cloud-builders/docker
89-
args: ['push', '--all-tags', 'gcr.io/oss-vdb/exporter']
90-
waitFor: ['build-exporter', 'cloud-build-queue']
91-
9283
- name: gcr.io/cloud-builders/docker
9384
args: ['build', '-t', 'gcr.io/oss-vdb/alias-computation:latest', '-t', 'gcr.io/oss-vdb/alias-computation:$COMMIT_SHA', '.']
9485
dir: 'gcp/workers/alias'
@@ -107,6 +98,21 @@ steps:
10798
args: ['push', '--all-tags', 'gcr.io/oss-vdb/recoverer']
10899
waitFor: ['build-recoverer', 'cloud-build-queue']
109100

101+
# Build/push exporter/record-checker go images
102+
- name: 'gcr.io/cloud-builders/docker'
103+
entrypoint: 'bash'
104+
args: ['-c', 'docker pull gcr.io/oss-vdb/exporter:latest || exit 0']
105+
id: 'pull-exporter'
106+
waitFor: ['setup']
107+
- name: gcr.io/cloud-builders/docker
108+
args: ['build', '-t', 'gcr.io/oss-vdb/exporter:latest', '-t', 'gcr.io/oss-vdb/exporter:$COMMIT_SHA', '-f', 'cmd/exporter/Dockerfile', '--cache-from', 'gcr.io/oss-vdb/exporter:latest', '--pull', '.']
109+
dir: 'go'
110+
id: 'build-exporter'
111+
waitFor: ['pull-exporter']
112+
- name: gcr.io/cloud-builders/docker
113+
args: ['push', '--all-tags', 'gcr.io/oss-vdb/exporter']
114+
waitFor: ['build-exporter', 'cloud-build-queue']
115+
110116
- name: 'gcr.io/cloud-builders/docker'
111117
entrypoint: 'bash'
112118
args: ['-c', 'docker pull gcr.io/oss-vdb/record-checker:latest || exit 0']

deployment/clouddeploy/gke-workers/base/exporter.yaml

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,11 @@ spec:
2424
- name: exporter
2525
image: exporter
2626
imagePullPolicy: Always
27-
volumeMounts:
28-
- mountPath: "/work"
29-
name: "ssd"
3027
resources:
3128
requests:
3229
cpu: "12"
33-
memory: "48G"
30+
memory: "48Gi"
3431
limits:
3532
cpu: "24"
36-
memory: "128G"
33+
memory: "128Gi"
3734
restartPolicy: Never
38-
volumes:
39-
- name: "ssd"
40-
emptyDir:
41-
sizeLimit: 60Gi

deployment/clouddeploy/gke-workers/environments/oss-vdb-test/exporter.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,7 @@ spec:
1414
value: oss-vdb-test
1515
args:
1616
# TODO(michaelkedar): single source of truth w/ terraform config
17+
- "--uploadToGCS=true"
1718
- "--bucket=osv-test-vulnerabilities"
19+
- "--osv_vulns_bucket=osv-test-vulnerabilities"
20+

deployment/clouddeploy/gke-workers/environments/oss-vdb/exporter.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,7 @@ spec:
1313
- name: GOOGLE_CLOUD_PROJECT
1414
value: oss-vdb
1515
args:
16+
- "--uploadToGCS=true"
1617
- "--bucket=osv-vulnerabilities"
18+
- "--osv_vulns_bucket=osv-vulnerabilities"
19+

go/cmd/exporter/Dockerfile

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
FROM golang:1.25.3-alpine@sha256:aee43c3ccbf24fdffb7295693b6e33b21e01baec1b2a55acc351fde345e9ec34 AS build
16+
17+
WORKDIR /src
18+
19+
COPY ./go.mod /src/go.mod
20+
COPY ./go.sum /src/go.sum
21+
RUN go mod download && go mod verify
22+
23+
24+
COPY ./ /src/
25+
RUN CGO_ENABLED=0 go build -o exporter ./cmd/exporter
26+
27+
FROM gcr.io/distroless/static-debian12@sha256:87bce11be0af225e4ca761c40babb06d6d559f5767fbf7dc3c47f0f1a466b92c
28+
29+
COPY --from=build /src/exporter /
30+
31+
ENTRYPOINT ["/exporter"]

go/cmd/exporter/README.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# OSV Exporter
2+
3+
The exporter is responsible for creating many files in the OSV vulnerabilities bucket from the canonical protobuf vulnerability format.
4+
5+
The generated files are:
6+
- `[ECOSYSTEM]/VULN-ID.json` - OSV JSON file for each vulnerability in each ecosystem
7+
- `[ECOSYSTEM]/all.zip` - contains each OSV JSON file for that ecosystem
8+
- `[ECOSYSTEM]/modified_id.csv` - contains the (modified, ID) of each vulnerability in the ecosystem directory
9+
- `/ecosystems.txt` - a line-separated list of each exported ecosystem
10+
- `/all.zip` - contains every OSV JSON file across all ecosytems
11+
- `/modified_id.csv` - the (modified, [ECOSYSTEM]/ID) of every vulnerability across all ecosystem directories
12+
- `GIT/osv_git.json` - a json array of every OSV vulnerability that has Vanir signatures.
13+
14+
## Running locally
15+
16+
To run the exporter locally, run the exporter from within the `go/cmd/exporter` directory, providing the GCS bucket containing the vulnerability protobufs via the `-osv_vulns_bucket` flag.
17+
18+
```sh
19+
# Example
20+
go run . -osv_vulns_bucket osv-test-vulnerabilities -uploadToGCS=false -bucket /tmp/osv-export
21+
```
22+
23+
This will write the exported files to the `/tmp/osv-export` directory.
24+
25+
Note that running this takes quite a long time and uses a lot of memory.
26+
27+
### Flags
28+
29+
- `-bucket`: Output bucket or directory name. If `-uploadToGCS` is false, this is a local path; otherwise, it's a GCS bucket name.
30+
- `-osv_vulns_bucket`: GCS bucket to read vulnerability protobufs from. Can also be set with the `OSV_VULNERABILITIES_BUCKET` environment variable.
31+
- `-uploadToGCS`: If false, writes the output to a local directory specified by `-bucket` instead of a GCS bucket.
32+
- `-num_workers`: The total number of concurrent workers to use for downloading from GCS and writing the output.

go/cmd/exporter/downloader.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"io"
6+
"log/slog"
7+
"sync"
8+
9+
"cloud.google.com/go/storage"
10+
"github.com/google/osv.dev/go/logger"
11+
"github.com/ossf/osv-schema/bindings/go/osvschema"
12+
"google.golang.org/protobuf/proto"
13+
)
14+
15+
// downloader is a worker that receives GCS object handles from inCh, downloads
16+
// the raw protobuf data, unmarshals it into a Vulnerability, and sends the
17+
// result to outCh.
18+
func downloader(ctx context.Context, inCh <-chan *storage.ObjectHandle, outCh chan<- *osvschema.Vulnerability, wg *sync.WaitGroup) {
19+
defer wg.Done()
20+
for {
21+
var obj *storage.ObjectHandle
22+
var ok bool
23+
24+
// Wait to receive an object, or be cancelled.
25+
select {
26+
case obj, ok = <-inCh:
27+
if !ok {
28+
return // Channel closed.
29+
}
30+
case <-ctx.Done():
31+
return
32+
}
33+
34+
// Process object.
35+
r, err := obj.NewReader(ctx)
36+
if err != nil {
37+
logger.Error("failed to open vulnerability", slog.String("obj", obj.ObjectName()), slog.Any("err", err))
38+
continue
39+
}
40+
data, err := io.ReadAll(r)
41+
r.Close()
42+
if err != nil {
43+
logger.Error("failed to read vulnerability", slog.String("obj", obj.ObjectName()), slog.Any("err", err))
44+
continue
45+
}
46+
vuln := &osvschema.Vulnerability{}
47+
if err := proto.Unmarshal(data, vuln); err != nil {
48+
logger.Error("failed to unmarshal vulnerability", slog.String("obj", obj.ObjectName()), slog.Any("err", err))
49+
continue
50+
}
51+
52+
// Wait to send the result, or be cancelled.
53+
select {
54+
case outCh <- vuln:
55+
case <-ctx.Done():
56+
return
57+
}
58+
}
59+
}

go/cmd/exporter/exporter.go

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
// Package main runs the exporter, exporting the whole OSV database to the GCS bucket.
2+
// See the README.md for more details.
3+
package main
4+
5+
import (
6+
"context"
7+
"errors"
8+
"flag"
9+
"log/slog"
10+
"os"
11+
"path/filepath"
12+
"strings"
13+
"sync"
14+
15+
"cloud.google.com/go/storage"
16+
"github.com/google/osv.dev/go/logger"
17+
"github.com/ossf/osv-schema/bindings/go/osvschema"
18+
"google.golang.org/api/iterator"
19+
)
20+
21+
const gcsProtoPrefix = "all/pb/"
22+
23+
// main is the entry point for the exporter. It initializes the GCS clients,
24+
// sets up the worker pipeline, and starts the GCS object iteration.
25+
func main() {
26+
logger.InitGlobalLogger()
27+
28+
outBucketName := flag.String("bucket", "osv-test-vulnerabilities", "Output bucket or directory name. If -local is true, this is a local path; otherwise, it's a GCS bucket name.")
29+
vulnBucketName := flag.String("osv_vulns_bucket", os.Getenv("OSV_VULNERABILITIES_BUCKET"), "GCS bucket to read vulnerability protobufs from. Can also be set with the OSV_VULNERABILITIES_BUCKET environment variable.")
30+
uploadToGCS := flag.Bool("uploadToGCS", false, "If false, writes the output to a local directory specified by -bucket instead of a GCS bucket.")
31+
numWorkers := flag.Int("num_workers", 200, "The total number of concurrent workers to use for downloading from GCS and writing the output.")
32+
33+
flag.Parse()
34+
35+
logger.Info("exporter starting",
36+
slog.String("bucket", *outBucketName),
37+
slog.String("osv_vulns_bucket", *vulnBucketName),
38+
slog.Bool("uploadToGCS", *uploadToGCS),
39+
slog.Int("num_workers", *numWorkers))
40+
41+
if *vulnBucketName == "" {
42+
logger.Fatal("OSV_VULNERABILITIES_BUCKET must be set")
43+
}
44+
45+
ctx, cancel := context.WithCancel(context.Background())
46+
defer cancel()
47+
48+
cl, err := storage.NewClient(ctx)
49+
if err != nil {
50+
logger.Fatal("failed to create storage client", slog.Any("err", err))
51+
}
52+
53+
vulnBucket := cl.Bucket(*vulnBucketName)
54+
var outBucket *storage.BucketHandle
55+
var outPrefix string
56+
if *uploadToGCS {
57+
outBucket = cl.Bucket(*outBucketName)
58+
} else {
59+
outPrefix = *outBucketName
60+
}
61+
62+
// The exporter uses a pipeline of channels and worker pools. The data flow is as follows:
63+
// 1. The main goroutine lists GCS objects and sends them to `gcsObjToDownloaderCh`.
64+
// 2. A pool of `downloader` workers receive GCS objects, downloads and unmarshals them into
65+
// OSV vulnerabilities, and send them to `downloaderToRouterCh`.
66+
// 3. The `ecosystemRouter` receives vulnerabilities and dispatches them. It creates a new
67+
// `ecosystemWorker` for each new ecosystem, and sends all vulnerabilities to a single
68+
// `allEcosystemWorker`.
69+
// 4. The `ecosystemWorker`s and the `allEcosystemWorker` process the vulnerabilities and
70+
// generate the final files, sending the data to be written to `routerToWriteCh`.
71+
// 5. A pool of `writer` workers receive the file data and write it to the output.
72+
gcsObjToDownloaderCh := make(chan *storage.ObjectHandle)
73+
downloaderToRouterCh := make(chan *osvschema.Vulnerability)
74+
routerToWriteCh := make(chan writeMsg)
75+
76+
var downloaderWg sync.WaitGroup
77+
for range *numWorkers / 2 {
78+
downloaderWg.Add(1)
79+
go downloader(ctx, gcsObjToDownloaderCh, downloaderToRouterCh, &downloaderWg)
80+
}
81+
82+
var writerWg sync.WaitGroup
83+
for range *numWorkers / 2 {
84+
writerWg.Add(1)
85+
go writer(ctx, cancel, routerToWriteCh, outBucket, outPrefix, &writerWg)
86+
}
87+
var routerWg sync.WaitGroup
88+
routerWg.Add(1)
89+
go ecosystemRouter(ctx, downloaderToRouterCh, routerToWriteCh, &routerWg)
90+
91+
it := vulnBucket.Objects(ctx, &storage.Query{Prefix: gcsProtoPrefix})
92+
prevPrefix := ""
93+
MainLoop:
94+
for {
95+
attrs, err := it.Next()
96+
if errors.Is(err, iterator.Done) {
97+
break
98+
}
99+
if err != nil {
100+
logger.Fatal("failed to list objects", slog.Any("err", err))
101+
}
102+
// Only log when we see a new ID prefix (i.e. roughly once per data source)
103+
prefix := filepath.Base(attrs.Name)
104+
prefix, _, _ = strings.Cut(prefix, "-")
105+
if prefix != prevPrefix {
106+
logger.Info("iterating vulnerabilities", slog.String("now_at", attrs.Name))
107+
prevPrefix = prefix
108+
}
109+
select {
110+
case gcsObjToDownloaderCh <- vulnBucket.Object(attrs.Name):
111+
case <-ctx.Done():
112+
break MainLoop
113+
}
114+
}
115+
116+
close(gcsObjToDownloaderCh)
117+
downloaderWg.Wait()
118+
close(downloaderToRouterCh)
119+
routerWg.Wait()
120+
close(routerToWriteCh)
121+
writerWg.Wait()
122+
123+
if ctx.Err() != nil {
124+
logger.Fatal("exporter cancelled")
125+
}
126+
logger.Info("export completed successfully")
127+
}
128+
129+
// ecosystemRouter receives vulnerabilities from inCh and fans them out to the
130+
// appropriate ecosystemWorker. It creates workers on-demand for each new
131+
// ecosystem encountered. It also sends every vulnerability to the allEcosystemWorker.
132+
func ecosystemRouter(ctx context.Context, inCh <-chan *osvschema.Vulnerability, outCh chan<- writeMsg, wg *sync.WaitGroup) {
133+
defer wg.Done()
134+
logger.Info("ecosystem router starting")
135+
workers := make(map[string]*ecosystemWorker)
136+
var workersWg sync.WaitGroup
137+
vulnCounter := 0
138+
139+
allEcosystemWorker := newAllEcosystemWorker(ctx, outCh, &workersWg)
140+
141+
for vuln := range inCh {
142+
vulnCounter++
143+
ecosystems := make(map[string]struct{})
144+
for _, aff := range vuln.GetAffected() {
145+
eco := aff.GetPackage().GetEcosystem()
146+
eco, _, _ = strings.Cut(eco, ":")
147+
if eco != "" {
148+
ecosystems[eco] = struct{}{}
149+
}
150+
for _, ref := range aff.GetRanges() {
151+
if ref.GetType() == osvschema.Range_GIT {
152+
ecosystems["GIT"] = struct{}{}
153+
}
154+
}
155+
}
156+
if len(ecosystems) == 0 {
157+
ecosystems["[EMPTY]"] = struct{}{}
158+
}
159+
ecoNames := make([]string, 0, len(ecosystems))
160+
for eco := range ecosystems {
161+
ecoNames = append(ecoNames, eco)
162+
worker, ok := workers[eco]
163+
if !ok {
164+
worker = newEcosystemWorker(ctx, eco, outCh, &workersWg)
165+
workers[eco] = worker
166+
}
167+
worker.inCh <- vuln
168+
}
169+
allEcosystemWorker.inCh <- vulnAndEcos{Vulnerability: vuln, ecosystems: ecoNames}
170+
}
171+
172+
for _, worker := range workers {
173+
worker.Finish()
174+
}
175+
allEcosystemWorker.Finish()
176+
workersWg.Wait()
177+
logger.Info("ecosystem router finished, all vulnerabilities dispatched", slog.Int("total_vulnerabilities", vulnCounter))
178+
}

0 commit comments

Comments
 (0)