Skip to content

Commit 148c10c

Browse files
authored
add timestamp to schema info (#1436)
* add timestamp to schemainfo * add tests
1 parent 7b4b3a6 commit 148c10c

File tree

3 files changed

+108
-0
lines changed

3 files changed

+108
-0
lines changed

pulsaradmin/pkg/admin/schema_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ func TestSchemas_CreateSchemaBySchemaInfo(t *testing.T) {
120120
info, err := admin.Schemas().GetSchemaInfo(topic)
121121
assert.NoError(t, err)
122122
assert.Equal(t, schemaInfo.Type, info.Type)
123+
assert.NotNil(t, info.Timestamp)
124+
assert.NotZero(t, info.Timestamp)
123125

124126
version, err := admin.Schemas().GetVersionBySchemaInfo(topic, schemaInfo)
125127
assert.NoError(t, err)

pulsaradmin/pkg/utils/schema_util.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ type SchemaInfo struct {
2222
Schema []byte `json:"schema"`
2323
Type string `json:"type"`
2424
Properties map[string]string `json:"properties"`
25+
Timestamp int64 `json:"timestamp,omitempty"`
2526
}
2627

2728
type SchemaInfoWithVersion struct {
@@ -64,6 +65,7 @@ func ConvertGetSchemaResponseToSchemaInfo(tn *TopicName, response GetSchemaRespo
6465
info.Type = response.Type
6566
info.Properties = response.Properties
6667
info.Name = tn.GetLocalName()
68+
info.Timestamp = response.Timestamp
6769

6870
return info
6971
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package utils
19+
20+
import (
21+
"testing"
22+
23+
"github.com/stretchr/testify/require"
24+
)
25+
26+
func TestConvertGetSchemaResponseToSchemaInfoTimestamp(t *testing.T) {
27+
topicName, err := GetTopicName("persistent://tenant/ns/topic")
28+
require.NoError(t, err)
29+
30+
props := map[string]string{"key": "value"}
31+
response := GetSchemaResponse{
32+
Version: 3,
33+
Type: "STRING",
34+
Timestamp: 123456789,
35+
Data: "payload",
36+
Properties: props,
37+
}
38+
39+
info := ConvertGetSchemaResponseToSchemaInfo(topicName, response)
40+
require.NotNil(t, info)
41+
require.Equal(t, "topic", info.Name)
42+
require.Equal(t, []byte("payload"), info.Schema)
43+
require.Equal(t, "STRING", info.Type)
44+
require.Equal(t, props, info.Properties)
45+
require.Equal(t, int64(123456789), info.Timestamp)
46+
}
47+
48+
func TestConvertGetSchemaResponseToSchemaInfoWithVersionTimestamp(t *testing.T) {
49+
topicName, err := GetTopicName("persistent://tenant/ns/topic")
50+
require.NoError(t, err)
51+
52+
response := GetSchemaResponse{
53+
Version: 9,
54+
Type: "JSON",
55+
Timestamp: 987654321,
56+
Data: "{}",
57+
Properties: map[string]string{
58+
"schema": "json",
59+
},
60+
}
61+
62+
infoWithVersion := ConvertGetSchemaResponseToSchemaInfoWithVersion(topicName, response)
63+
require.NotNil(t, infoWithVersion)
64+
require.Equal(t, int64(9), infoWithVersion.Version)
65+
require.NotNil(t, infoWithVersion.SchemaInfo)
66+
require.Equal(t, "topic", infoWithVersion.SchemaInfo.Name)
67+
require.Equal(t, "JSON", infoWithVersion.SchemaInfo.Type)
68+
require.Equal(t, int64(987654321), infoWithVersion.SchemaInfo.Timestamp)
69+
}
70+
71+
func TestConvertGetAllSchemasResponseToSchemaInfosWithVersionTimestamp(t *testing.T) {
72+
topicName, err := GetTopicName("persistent://tenant/ns/topic")
73+
require.NoError(t, err)
74+
75+
response := GetAllSchemasResponse{
76+
Schemas: []GetSchemaResponse{
77+
{
78+
Version: 1,
79+
Type: "AVRO",
80+
Timestamp: 1000,
81+
Data: "schema-1",
82+
Properties: map[string]string{"idx": "1"},
83+
},
84+
{
85+
Version: 2,
86+
Type: "PROTOBUF",
87+
Timestamp: 2000,
88+
Data: "schema-2",
89+
Properties: map[string]string{"idx": "2"},
90+
},
91+
},
92+
}
93+
94+
infos := ConvertGetAllSchemasResponseToSchemaInfosWithVersion(topicName, response)
95+
require.Len(t, infos, 2)
96+
97+
require.Equal(t, int64(1), infos[0].Version)
98+
require.NotNil(t, infos[0].SchemaInfo)
99+
require.Equal(t, int64(1000), infos[0].SchemaInfo.Timestamp)
100+
101+
require.Equal(t, int64(2), infos[1].Version)
102+
require.NotNil(t, infos[1].SchemaInfo)
103+
require.Equal(t, int64(2000), infos[1].SchemaInfo.Timestamp)
104+
}

0 commit comments

Comments
 (0)