11import type { Dataset , DatasetRow } from "@domain/datasets"
22import { DatasetRepository , createDataset , insertRows , listDatasets , listRows } from "@domain/datasets"
33import { DatasetId , DatasetVersionId , OrganizationId , ProjectId , putInDisk } from "@domain/shared"
4- import { DatasetRowRepositoryLive } from "@platform/db-clickhouse"
5- import { DatasetRepositoryLive , SqlClientLive } from "@platform/db-postgres"
4+ import { DatasetRowRepositoryLive , withClickHouse } from "@platform/db-clickhouse"
5+ import { DatasetRepositoryLive , withPostgres } from "@platform/db-postgres"
66import { createServerFn } from "@tanstack/react-start"
77import { Effect } from "effect"
88import Papa from "papaparse"
@@ -127,17 +127,15 @@ export const listDatasetsQuery = createServerFn({ method: "GET" })
127127 . inputValidator ( z . object ( { projectId : z . string ( ) } ) )
128128 . handler ( async ( { data } ) : Promise < { datasets : DatasetRecord [ ] ; total : number } > => {
129129 const { organizationId } = await requireSession ( )
130- const client = getPostgresClient ( )
131- const chClient = getClickhouseClient ( )
130+ const orgId = OrganizationId ( organizationId )
132131
133132 const result = await Effect . runPromise (
134133 listDatasets ( {
135- organizationId : OrganizationId ( organizationId ) ,
134+ organizationId : orgId ,
136135 projectId : ProjectId ( data . projectId ) ,
137136 } ) . pipe (
138- Effect . provide ( DatasetRepositoryLive ) ,
139- Effect . provide ( DatasetRowRepositoryLive ( chClient ) ) ,
140- Effect . provide ( SqlClientLive ( client , OrganizationId ( organizationId ) ) ) ,
137+ Effect . provide ( withPostgres ( getPostgresClient ( ) , orgId , DatasetRepositoryLive ) ) ,
138+ Effect . provide ( withClickHouse ( getClickhouseClient ( ) , orgId , DatasetRowRepositoryLive ) ) ,
141139 ) ,
142140 )
143141
@@ -157,21 +155,19 @@ export const listRowsQuery = createServerFn({ method: "GET" })
157155 )
158156 . handler ( async ( { data } ) : Promise < { rows : DatasetRowRecord [ ] ; total : number } > => {
159157 const { organizationId } = await requireSession ( )
160- const client = getPostgresClient ( )
161- const chClient = getClickhouseClient ( )
158+ const orgId = OrganizationId ( organizationId )
162159
163160 const result = await Effect . runPromise (
164161 listRows ( {
165- organizationId : OrganizationId ( organizationId ) ,
162+ organizationId : orgId ,
166163 datasetId : DatasetId ( data . datasetId ) ,
167164 ...( data . versionId ? { versionId : DatasetVersionId ( data . versionId ) } : { } ) ,
168165 ...( data . search ? { search : data . search } : { } ) ,
169166 limit : data . limit ,
170167 offset : data . offset ,
171168 } ) . pipe (
172- Effect . provide ( DatasetRepositoryLive ) ,
173- Effect . provide ( DatasetRowRepositoryLive ( chClient ) ) ,
174- Effect . provide ( SqlClientLive ( client , OrganizationId ( organizationId ) ) ) ,
169+ Effect . provide ( withPostgres ( getPostgresClient ( ) , orgId , DatasetRepositoryLive ) ) ,
170+ Effect . provide ( withClickHouse ( getClickhouseClient ( ) , orgId , DatasetRowRepositoryLive ) ) ,
175171 ) ,
176172 )
177173
@@ -183,18 +179,16 @@ export const createDatasetMutation = createServerFn({ method: "POST" })
183179 . inputValidator ( z . object ( { projectId : z . string ( ) , name : z . string ( ) . min ( 1 ) } ) )
184180 . handler ( async ( { data } ) : Promise < DatasetRecord > => {
185181 const { organizationId } = await requireSession ( )
186- const client = getPostgresClient ( )
187- const chClient = getClickhouseClient ( )
182+ const orgId = OrganizationId ( organizationId )
188183
189184 const dataset = await Effect . runPromise (
190185 createDataset ( {
191- organizationId : OrganizationId ( organizationId ) ,
186+ organizationId : orgId ,
192187 projectId : ProjectId ( data . projectId ) ,
193188 name : data . name ,
194189 } ) . pipe (
195- Effect . provide ( DatasetRepositoryLive ) ,
196- Effect . provide ( DatasetRowRepositoryLive ( chClient ) ) ,
197- Effect . provide ( SqlClientLive ( client , OrganizationId ( organizationId ) ) ) ,
190+ Effect . provide ( withPostgres ( getPostgresClient ( ) , orgId , DatasetRepositoryLive ) ) ,
191+ Effect . provide ( withClickHouse ( getClickhouseClient ( ) , orgId , DatasetRowRepositoryLive ) ) ,
198192 ) ,
199193 )
200194
@@ -209,8 +203,7 @@ export const saveDatasetCsv = createServerFn({ method: "POST" })
209203 } )
210204 . handler ( async ( { data : formData } ) : Promise < { version : number ; rowCount : number } > => {
211205 const { organizationId } = await requireSession ( )
212- const client = getPostgresClient ( )
213- const chClient = getClickhouseClient ( )
206+ const orgId = OrganizationId ( organizationId )
214207
215208 const file = formData . get ( "file" )
216209 const datasetId = formData . get ( "datasetId" )
@@ -232,7 +225,7 @@ export const saveDatasetCsv = createServerFn({ method: "POST" })
232225 const fileKey = await Effect . runPromise (
233226 putInDisk ( getStorageDisk ( ) , {
234227 namespace : "datasets" ,
235- organizationId : OrganizationId ( organizationId ) ,
228+ organizationId : orgId ,
236229 projectId : ProjectId ( projectId ) ,
237230 content,
238231 } ) ,
@@ -242,11 +235,7 @@ export const saveDatasetCsv = createServerFn({ method: "POST" })
242235 Effect . gen ( function * ( ) {
243236 const repo = yield * DatasetRepository
244237 return yield * repo . updateFileKey ( { id : DatasetId ( datasetId ) , fileKey } )
245- } ) . pipe (
246- Effect . provide ( DatasetRepositoryLive ) ,
247- Effect . provide ( DatasetRowRepositoryLive ( chClient ) ) ,
248- Effect . provide ( SqlClientLive ( client , OrganizationId ( organizationId ) ) ) ,
249- ) ,
238+ } ) . pipe ( Effect . provide ( withPostgres ( getPostgresClient ( ) , orgId , DatasetRepositoryLive ) ) ) ,
250239 )
251240
252241 const parsed = Papa . parse < Record < string , string > > ( content , { header : true , skipEmptyLines : true } )
@@ -258,14 +247,13 @@ export const saveDatasetCsv = createServerFn({ method: "POST" })
258247
259248 const result = await Effect . runPromise (
260249 insertRows ( {
261- organizationId : OrganizationId ( organizationId ) ,
250+ organizationId : orgId ,
262251 datasetId : DatasetId ( datasetId ) ,
263252 rows : mappedRows ,
264253 source : "csv" ,
265254 } ) . pipe (
266- Effect . provide ( DatasetRepositoryLive ) ,
267- Effect . provide ( DatasetRowRepositoryLive ( chClient ) ) ,
268- Effect . provide ( SqlClientLive ( client , OrganizationId ( organizationId ) ) ) ,
255+ Effect . provide ( withPostgres ( getPostgresClient ( ) , orgId , DatasetRepositoryLive ) ) ,
256+ Effect . provide ( withClickHouse ( getClickhouseClient ( ) , orgId , DatasetRowRepositoryLive ) ) ,
269257 ) ,
270258 )
271259
0 commit comments