@@ -19,6 +19,7 @@ package schemaregistry
1919import (
2020 "errors"
2121 "fmt"
22+ "github.com/google/uuid"
2223 "net/url"
2324 "reflect"
2425 "sort"
@@ -67,6 +68,8 @@ type mockclient struct {
6768 infoToSchemaCacheLock sync.RWMutex
6869 idToSchemaCache map [subjectID ]infoCacheEntry
6970 idToSchemaCacheLock sync.RWMutex
71+ guidToSchemaCache map [string ]infoCacheEntry
72+ guidToSchemaCacheLock sync.RWMutex
7073 schemaToVersionCache map [subjectJSON ]versionCacheEntry
7174 schemaToVersionCacheLock sync.RWMutex
7275 configCache map [string ]ServerConfig
@@ -118,7 +121,7 @@ func (c *mockclient) RegisterFullResponse(subject string, schema SchemaInfo, nor
118121 return * cacheEntryVal .metadata , nil
119122 }
120123
121- id , err := c .getIDFromRegistry (subject , schema )
124+ id , guid , err := c .getIDFromRegistry (subject , schema )
122125 if err != nil {
123126 return SchemaMetadata {
124127 ID : - 1 ,
@@ -127,14 +130,15 @@ func (c *mockclient) RegisterFullResponse(subject string, schema SchemaInfo, nor
127130 result = SchemaMetadata {
128131 SchemaInfo : schema ,
129132 ID : id ,
133+ GUID : guid ,
130134 }
131135 c .infoToSchemaCacheLock .Lock ()
132136 c .infoToSchemaCache [cacheKey ] = metadataCacheEntry {& result , false }
133137 c .infoToSchemaCacheLock .Unlock ()
134138 return result , nil
135139}
136140
137- func (c * mockclient ) getIDFromRegistry (subject string , schema SchemaInfo ) (int , error ) {
141+ func (c * mockclient ) getIDFromRegistry (subject string , schema SchemaInfo ) (int , string , error ) {
138142 var id = - 1
139143 c .idToSchemaCacheLock .RLock ()
140144 for key , value := range c .idToSchemaCache {
@@ -144,9 +148,18 @@ func (c *mockclient) getIDFromRegistry(subject string, schema SchemaInfo) (int,
144148 }
145149 }
146150 c .idToSchemaCacheLock .RUnlock ()
151+ var guid string
152+ c .guidToSchemaCacheLock .RLock ()
153+ for key , value := range c .guidToSchemaCache {
154+ if schemasEqual (* value .info , schema ) {
155+ guid = key
156+ break
157+ }
158+ }
159+ c .guidToSchemaCacheLock .RUnlock ()
147160 err := c .generateVersion (subject , schema )
148161 if err != nil {
149- return - 1 , err
162+ return - 1 , "" , err
150163 }
151164 if id < 0 {
152165 id = c .counter .increment ()
@@ -157,8 +170,13 @@ func (c *mockclient) getIDFromRegistry(subject string, schema SchemaInfo) (int,
157170 c .idToSchemaCacheLock .Lock ()
158171 c .idToSchemaCache [idCacheKey ] = infoCacheEntry {& schema , false }
159172 c .idToSchemaCacheLock .Unlock ()
173+
174+ guid = uuid .New ().String ()
175+ c .guidToSchemaCacheLock .Lock ()
176+ c .guidToSchemaCache [guid ] = infoCacheEntry {& schema , false }
177+ c .guidToSchemaCacheLock .Unlock ()
160178 }
161- return id , nil
179+ return id , guid , nil
162180}
163181
164182func (c * mockclient ) generateVersion (subject string , schema SchemaInfo ) error {
@@ -204,6 +222,23 @@ func (c *mockclient) GetBySubjectAndID(subject string, id int) (schema SchemaInf
204222 return SchemaInfo {}, & posErr
205223}
206224
225+ // GetByGUID returns the schema identified by guid
226+ // Returns Schema object on success
227+ func (c * mockclient ) GetByGUID (guid string ) (schema SchemaInfo , err error ) {
228+ c .guidToSchemaCacheLock .RLock ()
229+ cacheEntryValue , ok := c .guidToSchemaCache [guid ]
230+ c .guidToSchemaCacheLock .RUnlock ()
231+ if ok {
232+ return * cacheEntryValue .info , nil
233+ }
234+ posErr := url.Error {
235+ Op : "GET" ,
236+ URL : c .url .String () + fmt .Sprintf (internal .SchemasByGUID , guid ),
237+ Err : errors .New ("Schema Not Found" ),
238+ }
239+ return SchemaInfo {}, & posErr
240+ }
241+
207242func (c * mockclient ) GetSubjectsAndVersionsByID (id int ) (subjectsAndVersions []SubjectAndVersion , err error ) {
208243 subjectsAndVersions = make ([]SubjectAndVersion , 0 )
209244
@@ -255,10 +290,21 @@ func (c *mockclient) GetSubjectsAndVersionsByID(id int) (subjectsAndVersions []S
255290
256291// GetID checks if a schema has been registered with the subject. Returns ID if the registration can be found
257292func (c * mockclient ) GetID (subject string , schema SchemaInfo , normalize bool ) (id int , err error ) {
258- schemaJSON , err := schema . MarshalJSON ( )
293+ metadata , err := c . GetIDFullResponse ( subject , schema , normalize )
259294 if err != nil {
260295 return - 1 , err
261296 }
297+ return metadata .ID , err
298+ }
299+
300+ // GetIDFullResponse checks if a schema has been registered with the subject. Returns ID if the registration can be found
301+ func (c * mockclient ) GetIDFullResponse (subject string , schema SchemaInfo , normalize bool ) (result SchemaMetadata , err error ) {
302+ schemaJSON , err := schema .MarshalJSON ()
303+ if err != nil {
304+ return SchemaMetadata {
305+ ID : - 1 ,
306+ }, err
307+ }
262308 cacheKey := subjectJSON {
263309 subject : subject ,
264310 json : string (schemaJSON ),
@@ -270,15 +316,17 @@ func (c *mockclient) GetID(subject string, schema SchemaInfo, normalize bool) (i
270316 }
271317 c .infoToSchemaCacheLock .RUnlock ()
272318 if ok {
273- return cacheEntryVal .metadata . ID , nil
319+ return * cacheEntryVal .metadata , nil
274320 }
275321
276322 posErr := url.Error {
277323 Op : "GET" ,
278324 URL : c .url .String () + fmt .Sprintf (internal .Subjects , url .PathEscape (subject )),
279325 Err : errors .New ("Subject Not found" ),
280326 }
281- return - 1 , & posErr
327+ return SchemaMetadata {
328+ ID : - 1 ,
329+ }, & posErr
282330}
283331
284332// GetLatestSchemaMetadata fetches latest version registered with the provided subject
@@ -345,10 +393,28 @@ func (c *mockclient) GetSchemaMetadataIncludeDeleted(subject string, version int
345393 }
346394 return SchemaMetadata {}, & posErr
347395 }
396+ var guid string
397+ c .guidToSchemaCacheLock .RLock ()
398+ for key , value := range c .guidToSchemaCache {
399+ if schemasEqual (* value .info , info ) && (! value .softDeleted || deleted ) {
400+ guid = key
401+ break
402+ }
403+ }
404+ c .guidToSchemaCacheLock .RUnlock ()
405+ if guid == "" {
406+ posErr := url.Error {
407+ Op : "GET" ,
408+ URL : c .url .String () + fmt .Sprintf (internal .Versions , url .PathEscape (subject ), version ),
409+ Err : errors .New ("Subject Not found" ),
410+ }
411+ return SchemaMetadata {}, & posErr
412+ }
348413 return SchemaMetadata {
349414 SchemaInfo : info ,
350415
351416 ID : id ,
417+ GUID : guid ,
352418 Subject : subject ,
353419 Version : version ,
354420 }, nil
0 commit comments