@@ -11,8 +11,8 @@ import (
11
11
"github.com/opentracing/opentracing-go"
12
12
"github.com/parquet-go/parquet-go"
13
13
"github.com/pkg/errors"
14
+ "github.com/prometheus-community/parquet-common/queryable"
14
15
"github.com/prometheus-community/parquet-common/schema"
15
- "github.com/prometheus-community/parquet-common/search"
16
16
parquet_storage "github.com/prometheus-community/parquet-common/storage"
17
17
"github.com/prometheus/client_golang/prometheus"
18
18
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -125,14 +125,14 @@ func NewParquetQueryable(
125
125
return nil , err
126
126
}
127
127
128
- cache , err := newCache [* parquet_storage.ParquetShard ]("parquet-shards" , config .ParquetQueryableShardCacheSize , newCacheMetrics (reg ))
128
+ cache , err := newCache [parquet_storage.ParquetShard ]("parquet-shards" , config .ParquetQueryableShardCacheSize , newCacheMetrics (reg ))
129
129
if err != nil {
130
130
return nil , err
131
131
}
132
132
133
133
cDecoder := schema .NewPrometheusParquetChunksDecoder (chunkenc .NewPool ())
134
134
135
- parquetQueryable , err := search .NewParquetQueryable (cDecoder , func (ctx context.Context , mint , maxt int64 ) ([]* parquet_storage.ParquetShard , error ) {
135
+ parquetQueryable , err := queryable .NewParquetQueryable (cDecoder , func (ctx context.Context , mint , maxt int64 ) ([]parquet_storage.ParquetShard , error ) {
136
136
userID , err := tenant .TenantID (ctx )
137
137
if err != nil {
138
138
return nil , err
@@ -143,8 +143,8 @@ func NewParquetQueryable(
143
143
return nil , errors .Errorf ("failed to extract blocks from context" )
144
144
}
145
145
userBkt := bucket .NewUserBucketClient (userID , bucketClient , limits )
146
-
147
- shards := make ([]* parquet_storage.ParquetShard , len (blocks ))
146
+ bucketOpener := parquet_storage . NewParquetBucketOpener ( userBkt )
147
+ shards := make ([]parquet_storage.ParquetShard , len (blocks ))
148
148
errGroup := & errgroup.Group {}
149
149
150
150
span , ctx := opentracing .StartSpanFromContext (ctx , "parquetQuerierWithFallback.OpenShards" )
@@ -157,16 +157,18 @@ func NewParquetQueryable(
157
157
if shard == nil {
158
158
// we always only have 1 shard - shard 0
159
159
// Use context.Background() here as the file can be cached and live after the request ends.
160
- shard , err = parquet_storage .OpenParquetShard ( context . WithoutCancel ( ctx ),
161
- userBkt ,
160
+ shard , err = parquet_storage .NewParquetShardOpener (
161
+ context . WithoutCancel ( ctx ) ,
162
162
block .ID .String (),
163
+ bucketOpener ,
164
+ bucketOpener ,
163
165
0 ,
164
166
parquet_storage .WithFileOptions (
165
167
parquet .SkipMagicBytes (true ),
166
168
parquet .ReadBufferSize (100 * 1024 ),
167
169
parquet .SkipBloomFilters (true ),
170
+ parquet .OptimisticRead (true ),
168
171
),
169
- parquet_storage .WithOptimisticReader (true ),
170
172
)
171
173
if err != nil {
172
174
return errors .Wrapf (err , "failed to open parquet shard. block: %v" , block .ID .String ())
0 commit comments