Skip to content

Commit c4a93a1

Browse files
committed
feat(pipeline): implement FHIR validation pipeline step
Add a new validation step that validates FHIR resources against an external FHIR validation service via HTTP POST to /validateResource. The step produces per-file OperationOutcome reports in the validation/ directory and fails if any resource has error-level or fatal-level issues. It supports configurable concurrent validation requests and resumption by skipping files that already have reports. Validation is a "transparent" step — it doesn't produce FHIR data output, so subsequent pipeline steps look past it to find their input from the previous data-producing step. Closes #7
1 parent 454b8b0 commit c4a93a1

File tree

13 files changed

+1382
-17
lines changed

13 files changed

+1382
-17
lines changed

cmd/pipeline.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,20 @@ func executeStep(job *models.PipelineJob, stepName models.StepName, config *mode
223223
return nil
224224

225225
case models.StepValidation:
226-
fmt.Println("Validation step not yet implemented - job will remain at this step")
226+
fmt.Println("Starting validation step...")
227+
if err := pipeline.ExecuteValidationStep(job, jobDir, logger); err != nil {
228+
failedJob := pipeline.FailJob(job, err.Error())
229+
if saveErr := pipeline.UpdateJob(config.JobsDir, failedJob); saveErr != nil {
230+
logger.Error("Failed to save job state", "error", saveErr)
231+
}
232+
return fmt.Errorf("validation step failed: %w", err)
233+
}
234+
235+
if err := pipeline.UpdateJob(config.JobsDir, job); err != nil {
236+
return fmt.Errorf("failed to save job state: %w", err)
237+
}
238+
239+
fmt.Printf("\n✓ Validation completed\n")
227240
return nil
228241

229242
case models.StepCSVConversion:

internal/models/config.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func DefaultCompressionConfig() CompressionConfig {
4040
// ServiceConfig contains connection details for external HTTP services
4141
type ServiceConfig struct {
4242
DIMP DIMPConfig `yaml:"dimp" json:"dimp"`
43+
Validation ValidationConfig `yaml:"validation" json:"validation" mapstructure:"validation"`
4344
CSVConversion CSVConversionConfig `yaml:"csv_conversion" json:"csv_conversion"`
4445
ParquetConversion ParquetConversionConfig `yaml:"parquet_conversion" json:"parquet_conversion"`
4546
TORCH TORCHConfig `yaml:"torch" json:"torch"`
@@ -48,6 +49,12 @@ type ServiceConfig struct {
4849
LocalImport LocalImportConfig `yaml:"local_import" json:"local_import" mapstructure:"local_import"`
4950
}
5051

52+
// ValidationConfig contains FHIR validation service settings
53+
type ValidationConfig struct {
54+
URL string `yaml:"url" json:"url" mapstructure:"url"`
55+
MaxConcurrentRequests int `yaml:"max_concurrent_requests" json:"max_concurrent_requests" mapstructure:"max_concurrent_requests"`
56+
}
57+
5158
// LocalImportConfig contains settings for local directory import
5259
type LocalImportConfig struct {
5360
Dir string `yaml:"dir" json:"dir" mapstructure:"dir"` // Default directory path for local imports
@@ -371,6 +378,8 @@ func (c *ServiceConfig) HasServiceURL(step StepName) bool {
371378
switch step {
372379
case StepDIMP:
373380
return c.DIMP.URL != ""
381+
case StepValidation:
382+
return c.Validation.URL != ""
374383
case StepCSVConversion:
375384
return c.CSVConversion.URL != ""
376385
case StepParquetConversion:
@@ -380,7 +389,7 @@ func (c *ServiceConfig) HasServiceURL(step StepName) bool {
380389
case StepSend:
381390
return c.Send.IsConfigured()
382391
default:
383-
return true // Import and validation don't require external services
392+
return true // Import steps don't require external services
384393
}
385394
}
386395

@@ -389,6 +398,8 @@ func (c *ServiceConfig) GetServiceURL(step StepName) string {
389398
switch step {
390399
case StepDIMP:
391400
return c.DIMP.URL
401+
case StepValidation:
402+
return c.Validation.URL
392403
case StepCSVConversion:
393404
return c.CSVConversion.URL
394405
case StepParquetConversion:

internal/models/validation.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func (c *ProjectConfig) Validate() error {
162162
for _, step := range c.Pipeline.EnabledSteps {
163163
if !c.Services.HasServiceURL(step) {
164164
switch step {
165-
case StepDIMP, StepCSVConversion, StepParquetConversion, StepSend:
165+
case StepDIMP, StepValidation, StepCSVConversion, StepParquetConversion, StepSend:
166166
return fmt.Errorf("service URL required for enabled step '%s'", step)
167167
}
168168
}
@@ -190,6 +190,11 @@ func (c *ProjectConfig) Validate() error {
190190
return fmt.Errorf("invalid dimp url: %w", err)
191191
}
192192
}
193+
if c.Services.Validation.URL != "" {
194+
if _, err := url.Parse(c.Services.Validation.URL); err != nil {
195+
return fmt.Errorf("invalid validation url: %w", err)
196+
}
197+
}
193198
if c.Services.CSVConversion.URL != "" {
194199
if _, err := url.Parse(c.Services.CSVConversion.URL); err != nil {
195200
return fmt.Errorf("invalid csv_conversion url: %w", err)
@@ -296,6 +301,9 @@ func (c *ProjectConfig) ValidateServiceConnectivity(transport *http.Transport) e
296301
case StepDIMP:
297302
serviceURL = c.Services.DIMP.URL
298303
serviceName = "DIMP"
304+
case StepValidation:
305+
serviceURL = c.Services.Validation.URL
306+
serviceName = "Validation"
299307
case StepSend:
300308
serviceURL = c.Services.GetServiceURL(StepSend)
301309
serviceName = "Send"

0 commit comments

Comments
 (0)