Skip to content

Conversation

@hunter2046
Copy link
Contributor

@hunter2046 hunter2046 commented Apr 9, 2021

Motivation

Expose GetHashingFunction() so that the user could construct the hashFunc to call NewDefaultRouter.

Modifications

Expose GetHashingFunction() and added unit tests.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change added tests and can be verified as follows:

  • Added unit tests in producer_test.go

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (no)
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@aahmed-se aahmed-se requested a review from merlimat April 9, 2021 22:30
@hunter2046 hunter2046 marked this pull request as ready for review April 9, 2021 22:31
@merlimat
Copy link
Contributor

In the MessageRouter, you're getting a TopicMetadata object which will tell you the number of partitions:

type TopicMetadata interface {

Wouldn't this already solve the problem?

@hunter2046
Copy link
Contributor Author

In the MessageRouter, you're getting a TopicMetadata object which will tell you the number of partitions:

type TopicMetadata interface {

Wouldn't this already solve the problem?

IIUC, the TopicMetadata gives us the number of partitions at the time the MessageRouter is invoked. For our use case, we need to keep recording the number of partitions when a new partition key is defined.

I probably shouldn't paste the link here but DSP-28866 is the jira ticket that this change is made for.

@hunter2046 hunter2046 force-pushed the expose-num-partitions branch 2 times, most recently from d21a85b to 8cc3b92 Compare April 12, 2021 18:39
@hunter2046 hunter2046 changed the title Expose NumPartitions() method on the producer Expose NumPartitions() method on producer interface and GetHashingFunction() method Apr 12, 2021
@hunter2046
Copy link
Contributor Author

I have updated the pull request to also expose the GetHashingFunction() method to make it possible for the user to call NewDefaultRouter.

The idea about how we plan to make use of the changes can be found at https://cd.splunkdev.com/data-availability/s2s-svc/-/merge_requests/937/diffs

}

// GetHashingFunction return the corresponding hashing function for the hashing scheme
func GetHashingFunction(s HashingScheme) func(string) uint32 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to expose this publicly? An application is free to use a custom message router function, at that point it can use any hashing function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would like to mimic what the default router does including the hash function. The only difference would be to use a snapshotted numPartitions instead of a latest one that is from TopicMetadata:

if options.MessageRouter == nil {

@merlimat
Copy link
Contributor

IIUC, the TopicMetadata gives us the number of partitions at the time the MessageRouter is invoked. For our use case, we need to keep recording the number of partitions when a new partition key is defined.

I'm not convinced exposing NumPartitions() would help here. All the custom logic should be possible to implement within a custom message router.

In any case, there's already a way to get the partitions for a topic, by using Client.TopicPartitions()

@hunter2046 hunter2046 force-pushed the expose-num-partitions branch from 8cc3b92 to 769e561 Compare April 12, 2021 19:22
@hunter2046
Copy link
Contributor Author

hunter2046 commented Apr 12, 2021

IIUC, the TopicMetadata gives us the number of partitions at the time the MessageRouter is invoked. For our use case, we need to keep recording the number of partitions when a new partition key is defined.

I'm not convinced exposing NumPartitions() would help here. All the custom logic should be possible to implement within a custom message router.

In any case, there's already a way to get the partitions for a topic, by using Client.TopicPartitions()

ACK, I didn't realize there is a method already. That should work. I removed changes related to NumPartitions()

@hunter2046 hunter2046 changed the title Expose NumPartitions() method on producer interface and GetHashingFunction() method Expose GetHashingFunction() method on producer Apr 12, 2021
Copy link
Member

@wolfstudy wolfstudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1

@wolfstudy
Copy link
Member

The point is wrong, can MessageRouter not meet our functions?

e.g:

	producer, err := client.CreateProducer(ProducerOptions{
		Topic: "my-partitioned-topic",
		MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
			fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
			return 2
		},
	})

Copy link
Member

@wolfstudy wolfstudy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hunter2046 work for this, just a little comment, please check.

@hunter2046
Copy link
Contributor Author

The point is wrong, can MessageRouter not meet our functions?

e.g:

	producer, err := client.CreateProducer(ProducerOptions{
		Topic: "my-partitioned-topic",
		MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
			fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
			return 2
		},
	})

Thanks for taking a look. MessageRouter is what we need. We'd like to set the MessageRouter to a function that works similar to the defaultRouter with a customized number of partitions, e.g.

	internalRouter := pulsarClient.NewDefaultRouter(
		getHashingFunction(producerOptions.HashingScheme),
		producerOptions.BatchingMaxMessages,
		producerOptions.BatchingMaxSize,
		producerOptions.BatchingMaxPublishDelay,
		producerOptions.DisableBatching)
	messageRouter := func(message *pulsarClient.ProducerMessage, metadata pulsarClient.TopicMetadata) int {
                 // customNumberOfPartitions could be different from metadata.NumPartitions() here
		return internalRouter(message, customNumberOfPartitions)
	}

The problem is that getHashingFunction is not exposed so we can't use it to feed the NewDefaultRouter method.

@hunter2046 hunter2046 requested a review from wolfstudy April 19, 2021 21:37
@wolfstudy wolfstudy added this to the 0.6.0 milestone Jun 2, 2021
@wolfstudy
Copy link
Member

The point is wrong, can MessageRouter not meet our functions?
e.g:

	producer, err := client.CreateProducer(ProducerOptions{
		Topic: "my-partitioned-topic",
		MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
			fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
			return 2
		},
	})

Thanks for taking a look. MessageRouter is what we need. We'd like to set the MessageRouter to a function that works similar to the defaultRouter with a customized number of partitions, e.g.

	internalRouter := pulsarClient.NewDefaultRouter(
		getHashingFunction(producerOptions.HashingScheme),
		producerOptions.BatchingMaxMessages,
		producerOptions.BatchingMaxSize,
		producerOptions.BatchingMaxPublishDelay,
		producerOptions.DisableBatching)
	messageRouter := func(message *pulsarClient.ProducerMessage, metadata pulsarClient.TopicMetadata) int {
                 // customNumberOfPartitions could be different from metadata.NumPartitions() here
		return internalRouter(message, customNumberOfPartitions)
	}

The problem is that getHashingFunction is not exposed so we can't use it to feed the NewDefaultRouter method.

Sorry for the late reply. @hunter2046 Regarding all the options in NewDefaultRouter, we expose them in the form of parameters. You can set these options in producerOptions, and NewDefaultRouter is also the specific value obtained from these options.

@hunter2046
Copy link
Contributor Author

The point is wrong, can MessageRouter not meet our functions?
e.g:

	producer, err := client.CreateProducer(ProducerOptions{
		Topic: "my-partitioned-topic",
		MessageRouter: func(msg *ProducerMessage, tm TopicMetadata) int {
			fmt.Println("Routing message ", msg, " -- Partitions: ", tm.NumPartitions())
			return 2
		},
	})

Thanks for taking a look. MessageRouter is what we need. We'd like to set the MessageRouter to a function that works similar to the defaultRouter with a customized number of partitions, e.g.

	internalRouter := pulsarClient.NewDefaultRouter(
		getHashingFunction(producerOptions.HashingScheme),
		producerOptions.BatchingMaxMessages,
		producerOptions.BatchingMaxSize,
		producerOptions.BatchingMaxPublishDelay,
		producerOptions.DisableBatching)
	messageRouter := func(message *pulsarClient.ProducerMessage, metadata pulsarClient.TopicMetadata) int {
                 // customNumberOfPartitions could be different from metadata.NumPartitions() here
		return internalRouter(message, customNumberOfPartitions)
	}

The problem is that getHashingFunction is not exposed so we can't use it to feed the NewDefaultRouter method.

Sorry for the late reply. @hunter2046 Regarding all the options in NewDefaultRouter, we expose them in the form of parameters. You can set these options in producerOptions, and NewDefaultRouter is also the specific value obtained from these options.

Thanks. I understand that those options including MessageRouter are in producerOptions. My use case (as mentioned in #507 (comment)) is that I need to set the MessageRouter in a customized way that is close to what the internalRouter does except using a different number of partitions to avoid causing producers to send messages to a different paritition when the number of partitions is being increased.

In order to do that, I need to initialize the internalRouter by my own by calling NewDefaultRouter. To call NewDefaultRouter, I could supply my own hashing function for sure. However, because I want to mimic what the internalRouter does, I would like using the same hashing function. That is why I want to see if getHashingFunction can be exposed so I could achieve the above thing. Hope that this makes sense.

@wolfstudy wolfstudy modified the milestones: 0.6.0, 0.7.0 Jul 21, 2021
@wolfstudy wolfstudy modified the milestones: 0.7.0, v0.8.0 Nov 1, 2021
@wolfstudy wolfstudy removed this from the v0.8.0 milestone Feb 16, 2022
@wolfstudy wolfstudy added this to the 0.9.0 milestone Feb 16, 2022
@freeznet freeznet modified the milestones: v0.9.0, v0.10.0 Jul 4, 2022
@RobertIndie RobertIndie modified the milestones: v0.10.0, v0.11.0 Mar 27, 2023
@RobertIndie RobertIndie modified the milestones: v0.11.0, v0.12.0 Jul 4, 2023
@RobertIndie RobertIndie modified the milestones: v0.12.0, v0.13.0 Jan 10, 2024
@RobertIndie RobertIndie modified the milestones: v0.13.0, v0.14.0 Jul 15, 2024
@RobertIndie RobertIndie modified the milestones: v0.14.0, v0.15.0 Oct 8, 2024
@RobertIndie RobertIndie modified the milestones: v0.15.0, v0.16.0 May 15, 2025
@RobertIndie RobertIndie modified the milestones: v0.16.0, v0.17.0 Jul 29, 2025
@RobertIndie RobertIndie modified the milestones: v0.17.0, v0.18.0 Oct 23, 2025
@RobertIndie RobertIndie modified the milestones: v0.18.0, v0.19.0 Dec 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants