@@ -172,17 +172,15 @@ func (m *topicManager) ensureExists(topic string, npar, rfactor int, config map[
172172
173173 partitions , err := m .Partitions (topic )
174174
175- if err != nil {
176- if err != errTopicNotFound {
177- return fmt .Errorf ("error checking topic: %v" , err )
178- }
175+ if err != nil && err != errTopicNotFound {
176+ return errTopicChecking (topic , err )
179177 }
180178 // no topic yet, let's create it
181179 if len (partitions ) == 0 {
182180
183181 // (or not)
184182 if m .topicManagerConfig .NoCreate {
185- return fmt . Errorf ( "topic `%s` does not exist but the manager is configured with NoCreate, so it will not attempt to create it" , topic )
183+ return errTopicNoCreate ( topic )
186184 }
187185
188186 return m .createTopic (topic ,
@@ -195,7 +193,7 @@ func (m *topicManager) ensureExists(topic string, npar, rfactor int, config map[
195193
196194 // partitions do not match
197195 if len (partitions ) != npar {
198- return m .handleConfigMismatch (fmt .Sprintf ("partition count mismatch for topic %s . Need %d, but existing topic has %d" , topic , npar , len (partitions )))
196+ return m .handleConfigMismatch (fmt .Sprintf ("partition count mismatch for topic '%s' . Need %d, but existing topic has %d" , topic , npar , len (partitions )))
199197 }
200198
201199 // check additional config values via the cluster admin if our current version supports it
@@ -249,7 +247,7 @@ func (m *topicManager) waitForCreated(topic string) error {
249247 case errTopicNotFound :
250248 time .Sleep (time .Second )
251249 default :
252- return fmt . Errorf ( "error checking topic: %w" , err )
250+ return errTopicChecking ( topic , err )
253251 }
254252 }
255253 return fmt .Errorf ("waiting for topic %s to be created timed out" , topic )
0 commit comments