Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/rdkafka_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,51 @@ RD_EXPORT void
rd_kafka_mock_sharegroup_set_max_size(rd_kafka_mock_cluster_t *mcluster,
int max_size);

/**
* @brief Set the maximum number of fetch sessions allowed in a share group.
*
* New sessions attempted via ShareFetch with epoch 0 when the group
* is at capacity will receive SHARE_SESSION_LIMIT_REACHED.
*
* Default is 2000 (per KIP-932 group.share.max.share.sessions).
*
* @param mcluster Mock cluster instance.
* @param max_fetch_sessions Maximum fetch sessions allowed. 0 = unlimited.
*/
RD_EXPORT void rd_kafka_mock_sharegroup_set_max_fetch_sessions(
rd_kafka_mock_cluster_t *mcluster,
int max_fetch_sessions);

/**
* @brief Set the maximum number of in-flight record locks per share-partition.
*
* Once the limit is reached, no more records are acquired until existing
* locks are released (via ack, release, reject, or lock expiry).
*
* Default is 2000 (per KIP-932 group.share.partition.max.record.locks).
*
* @param mcluster Mock cluster instance.
* @param max_record_locks Maximum in-flight records per partition. 0 = unlimited.
*/
RD_EXPORT void rd_kafka_mock_sharegroup_set_max_record_locks(
rd_kafka_mock_cluster_t *mcluster,
int max_record_locks);

/**
* @brief Set the auto offset reset policy for share groups.
*
* Controls where SPSO is initialized when a share-partition is first
* consumed.
*
* Default is 0 ("latest") per KIP-932.
*
* @param mcluster Mock cluster instance.
* @param auto_offset_reset 0 = latest, 1 = earliest.
*/
RD_EXPORT void rd_kafka_mock_sharegroup_set_auto_offset_reset(
rd_kafka_mock_cluster_t *mcluster,
int auto_offset_reset);

/**
* @brief Set a manual target assignment for a sharegroup.
*
Expand Down
394 changes: 298 additions & 96 deletions src/rdkafka_mock_handlers.c

Large diffs are not rendered by default.

24 changes: 21 additions & 3 deletions src/rdkafka_mock_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,10 @@ typedef struct rd_kafka_mock_cgrp_consumer_member_s {
* @brief Share record state.
*/
enum rd_kafka_mock_sgrp_record_state_e {
RD_KAFKA_MOCK_SGRP_RECORD_AVAILABLE = 0,
RD_KAFKA_MOCK_SGRP_RECORD_ACQUIRED = 1,
RD_KAFKA_MOCK_SGRP_RECORD_ARCHIVED = 2
RD_KAFKA_MOCK_SGRP_RECORD_AVAILABLE = 0,
RD_KAFKA_MOCK_SGRP_RECORD_ACQUIRED = 1,
RD_KAFKA_MOCK_SGRP_RECORD_ACKNOWLEDGED = 2,
RD_KAFKA_MOCK_SGRP_RECORD_ARCHIVED = 3
};

typedef struct rd_kafka_mock_sgrp_record_state_s {
Expand Down Expand Up @@ -221,6 +222,7 @@ typedef struct rd_kafka_mock_sgrp_fetch_session_s {
int32_t session_epoch;
rd_ts_t ts_last_activity;
rd_kafka_topic_partition_list_t *partitions;
int partition_start_idx; /**< Rotation index for starvation prevention */
} rd_kafka_mock_sgrp_fetch_session_t;

/**
Expand Down Expand Up @@ -258,6 +260,13 @@ typedef struct rd_kafka_mock_sharegroup_s {
int isolation_level; /**< Share isolation level */
int max_size; /**< Max members allowed.
* 0 = unlimited (default). */
int max_fetch_sessions; /**< Max fetch sessions allowed.
* 0 = unlimited (default 2000). */
int max_record_locks; /**< Max in-flight records per
* share-partition.
* 0 = unlimited (default 2000). */
int auto_offset_reset; /**< 0 = latest (default per KIP-932),
* 1 = earliest. */
} rd_kafka_mock_sharegroup_t;

/**
Expand Down Expand Up @@ -643,6 +652,15 @@ struct rd_kafka_mock_cluster_s {
/** Max members allowed in share group (KIP 932).
* 0 = unlimited. */
int sharegroup_max_size;
/** Max fetch sessions per share group (KIP 932).
* 0 = unlimited. */
int sharegroup_max_fetch_sessions;
/** Max in-flight records per share-partition (KIP 932).
* 0 = unlimited. */
int sharegroup_max_record_locks;
/** Auto offset reset (KIP 932).
* 0 = latest (default), 1 = earliest. */
int sharegroup_auto_offset_reset;
} defaults;

/**< Dynamic array of IO handlers for corresponding fd in .fds */
Expand Down
52 changes: 52 additions & 0 deletions src/rdkafka_mock_sharegrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ void rd_kafka_mock_sharegrps_init(rd_kafka_mock_cluster_t *mcluster) {
mcluster->defaults.sharegroup_record_lock_duration_ms = 0;
mcluster->defaults.sharegroup_max_size = 0;
mcluster->defaults.sharegroup_isolation_level = 0;
mcluster->defaults.sharegroup_max_fetch_sessions = 2000;
mcluster->defaults.sharegroup_max_record_locks = 2000;
mcluster->defaults.sharegroup_auto_offset_reset = 0; /* latest */
}

/**
Expand Down Expand Up @@ -114,6 +117,12 @@ rd_kafka_mock_sharegroup_get(rd_kafka_mock_cluster_t *mcluster,
mcluster->defaults.sharegroup_record_lock_duration_ms;
mshgrp->isolation_level = mcluster->defaults.sharegroup_isolation_level;
mshgrp->max_size = mcluster->defaults.sharegroup_max_size;
mshgrp->max_fetch_sessions =
mcluster->defaults.sharegroup_max_fetch_sessions;
mshgrp->max_record_locks =
mcluster->defaults.sharegroup_max_record_locks;
mshgrp->auto_offset_reset =
mcluster->defaults.sharegroup_auto_offset_reset;

rd_kafka_timer_start(&mcluster->timers, &mshgrp->session_tmr,
1000 * 1000 /* 1s */,
Expand Down Expand Up @@ -778,6 +787,49 @@ void rd_kafka_mock_sharegroup_set_max_size(rd_kafka_mock_cluster_t *mcluster,
mtx_unlock(&mcluster->lock);
}

/**
* @brief Set the maximum number of fetch sessions allowed in a share group.
*/
void rd_kafka_mock_sharegroup_set_max_fetch_sessions(
rd_kafka_mock_cluster_t *mcluster,
int max_fetch_sessions) {
rd_kafka_mock_sharegroup_t *mshgrp;
mtx_lock(&mcluster->lock);
TAILQ_FOREACH(mshgrp, &mcluster->sharegrps, link)
mshgrp->max_fetch_sessions = max_fetch_sessions;
mcluster->defaults.sharegroup_max_fetch_sessions = max_fetch_sessions;
mtx_unlock(&mcluster->lock);
}

/**
* @brief Set the maximum number of in-flight record locks per
* share-partition.
*/
void rd_kafka_mock_sharegroup_set_max_record_locks(
rd_kafka_mock_cluster_t *mcluster,
int max_record_locks) {
rd_kafka_mock_sharegroup_t *mshgrp;
mtx_lock(&mcluster->lock);
TAILQ_FOREACH(mshgrp, &mcluster->sharegrps, link)
mshgrp->max_record_locks = max_record_locks;
mcluster->defaults.sharegroup_max_record_locks = max_record_locks;
mtx_unlock(&mcluster->lock);
}

/**
* @brief Set the auto offset reset policy for share groups.
*/
void rd_kafka_mock_sharegroup_set_auto_offset_reset(
rd_kafka_mock_cluster_t *mcluster,
int auto_offset_reset) {
rd_kafka_mock_sharegroup_t *mshgrp;
mtx_lock(&mcluster->lock);
TAILQ_FOREACH(mshgrp, &mcluster->sharegrps, link)
mshgrp->auto_offset_reset = auto_offset_reset;
mcluster->defaults.sharegroup_auto_offset_reset = auto_offset_reset;
mtx_unlock(&mcluster->lock);
}

/**
* @brief Destroy share fetch session.
*/
Expand Down
Loading