Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 76 additions & 10 deletions bdd/go/tests/basic_messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type basicMessagingCtx struct {
lastPollMessages *iggcon.PolledMessage
lastStreamID *uint32
lastStreamName *string
lastDeletedStreamID *uint32
lastTopicID *uint32
lastTopicName *string
lastTopicPartitions *uint32
Expand Down Expand Up @@ -209,6 +210,56 @@ func (s basicMessagingSteps) thenLastPolledMessageMatchesSent(ctx context.Contex
return nil
}

func (s basicMessagingSteps) whenUpdateStreamName(ctx context.Context, newName string) error {
c := getBasicMessagingCtx(ctx)
streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID)
if err := c.client.UpdateStream(streamIdentifier, newName); err != nil {
return fmt.Errorf("failed to update stream: %w", err)
}
c.lastStreamName = &newName
return nil
}

func (s basicMessagingSteps) thenStreamNameUpdated(ctx context.Context, expectedName string) error {
c := getBasicMessagingCtx(ctx)
streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID)
stream, err := c.client.GetStream(streamIdentifier)
if err != nil {
return fmt.Errorf("failed to get stream: %w", err)
}
if stream.Name != expectedName {
return fmt.Errorf("expected stream name %s, got %s", expectedName, stream.Name)
}
return nil
}

func (s basicMessagingSteps) whenDeleteStreamByName(ctx context.Context, name string) error {
c := getBasicMessagingCtx(ctx)
streamIdentifier, err := iggcon.NewIdentifier(name)
if err != nil {
return fmt.Errorf("invalid stream name %q: %w", name, err)
}
if err := c.client.DeleteStream(streamIdentifier); err != nil {
return fmt.Errorf("failed to delete stream: %w", err)
}
c.lastDeletedStreamID = c.lastStreamID
c.lastStreamID = nil
return nil
}

func (s basicMessagingSteps) thenStreamDeletedSuccessfully(ctx context.Context) error {
c := getBasicMessagingCtx(ctx)
if c.lastDeletedStreamID == nil {
return errors.New("no stream was deleted in this scenario")
}
streamIdentifier, _ := iggcon.NewIdentifier(*c.lastDeletedStreamID)
stream, err := c.client.GetStream(streamIdentifier)
if err == nil && stream != nil {
return fmt.Errorf("stream %d still exists after deletion", *c.lastDeletedStreamID)
}
return nil
}

func (s basicMessagingSteps) givenNoStreams(ctx context.Context) error {
client := getBasicMessagingCtx(ctx).client
streams, err := client.GetStreams()
Expand Down Expand Up @@ -305,27 +356,42 @@ func initBasicMessagingScenario(sc *godog.ScenarioContext) {
return context.WithValue(context.Background(), basicMessagingCtxKey{}, &basicMessagingCtx{}), nil
})
s := &basicMessagingSteps{}
sc.Step(`I have a running Iggy server`, s.givenRunningServer)
sc.Step(`I am authenticated as the root user`, s.givenAuthenticationAsRoot)
sc.Step(`^I have a running Iggy server$`, s.givenRunningServer)
sc.Step(`^I am authenticated as the root user$`, s.givenAuthenticationAsRoot)
sc.Step(`^I send (\d+) messages to stream (\d+), topic (\d+), partition (\d+)$`, s.whenSendMessages)
sc.Step(`^I poll messages from stream (\d+), topic (\d+), partition (\d+) starting from offset (\d+)$`, s.whenPollMessages)
sc.Step(`all messages should be sent successfully`, s.thenMessageSentSuccessfully)
sc.Step(`^all messages should be sent successfully$`, s.thenMessageSentSuccessfully)
sc.Step(`^I should receive (\d+) messages$`, s.thenShouldReceiveMessages)
sc.Step(`^the messages should have sequential offsets from (\d+) to (\d+)$`, s.thenMessagesHaveSequentialOffsets)
sc.Step(`each message should have the expected payload content`, s.thenMessagesHaveExpectedPayload)
sc.Step(`the last polled message should match the last sent message`, s.thenLastPolledMessageMatchesSent)
sc.Step(`^each message should have the expected payload content$`, s.thenMessagesHaveExpectedPayload)
sc.Step(`^the last polled message should match the last sent message$`, s.thenLastPolledMessageMatchesSent)
sc.Step(`^the stream should have name "([^"]*)"$`, s.thenStreamHasName)
sc.Step(`the stream should be created successfully`, s.thenStreamCreatedSuccessfully)
sc.Step(`^the stream should be created successfully$`, s.thenStreamCreatedSuccessfully)
sc.Step(`^I create a stream with name "([^"]*)"$`, s.whenCreateStream)
sc.Step(`I have no streams in the system`, s.givenNoStreams)
sc.Step(`^I have no streams in the system$`, s.givenNoStreams)
sc.Step(`^I create a topic with name "([^"]*)" in stream (\d+) with (\d+) partitions$`, s.whenCreateTopic)
sc.Step(`the topic should be created successfully`, s.thenTopicCreatedSuccessfully)
sc.Step(`^the topic should be created successfully$`, s.thenTopicCreatedSuccessfully)
sc.Step(`^the topic should have name "([^"]*)"$`, s.thenTopicHasName)
sc.Step(`^the topic should have (\d+) partitions$`, s.thenTopicsHasPartitions)
sc.Step(`^I update the stream name to "([^"]*)"$`, s.whenUpdateStreamName)
sc.Step(`^the stream name should be updated to "([^"]*)"$`, s.thenStreamNameUpdated)
sc.Step(`^I delete the stream with name "([^"]*)"$`, s.whenDeleteStreamByName)
sc.Step(`^the stream should be deleted successfully$`, s.thenStreamDeletedSuccessfully)
sc.After(func(ctx context.Context, sc *godog.Scenario, scErr error) (context.Context, error) {
c := getBasicMessagingCtx(ctx)
if err := c.client.Close(); err != nil {
scErr = errors.Join(scErr, fmt.Errorf("error closing client: %w", err))
// Best-effort cleanup: if the scenario left a stream behind (e.g.
// failed before the explicit delete step), try to remove it so the
// next scenario starts clean. A failure here is intentionally
// ignored; for guaranteed teardown across all resource kinds, a
// global cleanup script remains the better long-term solution.
if c.client != nil && c.lastStreamID != nil {
streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID)
_ = c.client.DeleteStream(streamIdentifier)
}
if c.client != nil {
if err := c.client.Close(); err != nil {
scErr = errors.Join(scErr, fmt.Errorf("error closing client: %w", err))
}
}
return ctx, scErr
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.cucumber.java.en.When;
import org.apache.iggy.client.blocking.IggyBaseClient;
import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
import org.apache.iggy.identifier.StreamId;
import org.apache.iggy.message.Message;
import org.apache.iggy.message.Partitioning;
import org.apache.iggy.message.PollingStrategy;
Expand All @@ -43,6 +44,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class BasicMessagingSteps {
Expand Down Expand Up @@ -209,6 +211,30 @@ public void lastPolledMessageMatchesSent() {
assertEquals(context.lastSentMessage, lastPayload, "Last message should match sent message");
}

@When("I update the stream name to {string}")
public void updateStreamName(String newName) {
getClient().streams().updateStream(context.lastStreamId, newName);
context.lastStreamName = newName;
}

@Then("the stream name should be updated to {string}")
public void streamNameUpdated(String expectedName) {
Optional<StreamDetails> stream = getClient().streams().getStream(context.lastStreamId);
assertTrue(stream.isPresent(), "Stream should exist");
assertEquals(expectedName, stream.get().name(), "Stream name should be updated");
}

@When("I delete the stream with name {string}")
public void deleteStream(String name) {
getClient().streams().deleteStream(StreamId.of(name));
context.lastStreamId = null;
}

@Then("the stream should be deleted successfully")
public void streamDeletedSuccessfully() {
assertNull(context.lastStreamId, "Stream should have been deleted");
}

private IggyBaseClient getClient() {
if (context.client == null) {
throw new IllegalStateException("Iggy client not initialized");
Expand Down
40 changes: 40 additions & 0 deletions bdd/python/tests/test_basic_messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,43 @@ def verify_last_message_match(context):
last_polled_payload = last_polled.payload().decode("utf-8")

assert last_polled_payload == context.last_sent_message


@when(parsers.parse('I update the stream name to "{new_name}"'))
def update_stream_name(context, new_name):
"""Update the stream name"""

async def _update():
await context.client.update_stream(context.last_stream_id, new_name)
context.last_stream_name = new_name

asyncio.run(_update())


@then(parsers.parse('the stream name should be updated to "{expected_name}"'))
def verify_stream_name_updated(context, expected_name):
"""Verify stream name was updated"""

async def _verify():
stream = await context.client.get_stream(context.last_stream_id)
assert stream is not None
assert stream.name == expected_name

asyncio.run(_verify())


@when(parsers.parse('I delete the stream with name "{name}"'))
def delete_stream(context, name):
"""Delete the stream by name"""

async def _delete():
await context.client.delete_stream(name)
context.last_stream_id = None

asyncio.run(_delete())


@then("the stream should be deleted successfully")
def verify_stream_deleted(context):
"""Verify stream was deleted"""
assert context.last_stream_id is None
46 changes: 45 additions & 1 deletion bdd/rust/tests/steps/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use crate::common::global_context::GlobalContext;
use cucumber::{given, then, when};
use iggy::prelude::StreamClient;
use iggy::prelude::{Identifier, StreamClient};

#[given("I have no streams in the system")]
pub async fn given_no_streams(world: &mut GlobalContext) {
Expand Down Expand Up @@ -64,3 +64,47 @@ pub async fn then_stream_has_name(world: &mut GlobalContext, expected_name: Stri
"Stream should have expected name"
);
}

#[when(regex = r#"^I update the stream name to "([^"]*)"$"#)]
pub async fn when_update_stream_name(world: &mut GlobalContext, new_name: String) {
let client = world.client.as_ref().expect("Client should be available");
let stream_id = world.last_stream_id.expect("Stream should exist");
let identifier = Identifier::numeric(stream_id).unwrap();
client
.update_stream(&identifier, &new_name)
.await
.expect("Should be able to update stream");
world.last_stream_name = Some(new_name);
}

#[then(regex = r#"^the stream name should be updated to "([^"]*)"$"#)]
pub async fn then_stream_name_updated(world: &mut GlobalContext, expected_name: String) {
let client = world.client.as_ref().expect("Client should be available");
let stream_id = world.last_stream_id.expect("Stream should exist");
let identifier = Identifier::numeric(stream_id).unwrap();
let stream = client
.get_stream(&identifier)
.await
.expect("Should be able to get stream")
.expect("Stream should exist");
assert_eq!(stream.name, expected_name, "Stream name should be updated");
}

#[when(regex = r#"^I delete the stream with name "([^"]*)"$"#)]
pub async fn when_delete_stream(world: &mut GlobalContext, name: String) {
let client = world.client.as_ref().expect("Client should be available");
let identifier = Identifier::named(&name).expect("Stream name should be valid");
client
.delete_stream(&identifier)
.await
.expect("Should be able to delete stream");
world.last_stream_id = None;
}

#[then("the stream should be deleted successfully")]
pub async fn then_stream_deleted_successfully(world: &mut GlobalContext) {
assert!(
world.last_stream_id.is_none(),
"Stream should have been deleted"
);
}
6 changes: 6 additions & 0 deletions bdd/scenarios/basic_messaging.feature
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,9 @@ Feature: Basic Messaging Operations
And the messages should have sequential offsets from 0 to 9
And each message should have the expected payload content
And the last polled message should match the last sent message

When I update the stream name to "test-stream-updated"
Then the stream name should be updated to "test-stream-updated"

When I delete the stream with name "test-stream-updated"
Then the stream should be deleted successfully
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,36 @@ public void ThenTheLastPolledMessageShouldMatchTheLastSentMessage()
lastPolled.Header.Id.ShouldBe(_context.LastSendMessage.Header.Id);
lastPolled.Payload.ShouldBe(_context.LastSendMessage.Payload);
}

[When("I update the stream name to {string}")]
public async Task WhenIUpdateTheStreamNameTo(string newName)
{
_context.CreatedStream.ShouldNotBeNull();
await _context.IggyClient.UpdateStreamAsync(
Identifier.Numeric(_context.CreatedStream!.Id), newName);
_context.CreatedStream = await _context.IggyClient.GetStreamByIdAsync(
Identifier.Numeric(_context.CreatedStream.Id));
}

[Then("the stream name should be updated to {string}")]
public void ThenTheStreamNameShouldBeUpdatedTo(string expectedName)
{
_context.CreatedStream.ShouldNotBeNull();
_context.CreatedStream!.Name.ShouldBe(expectedName);
}

[When(@"I delete the stream with name ""(.*)""")]
public async Task WhenIDeleteTheStream(string name)
{
await _context.IggyClient.DeleteStreamAsync(Identifier.String(name));
_context.CreatedStream = null;
}

[Then(@"the stream should be deleted successfully")]
public void ThenTheStreamShouldBeDeletedSuccessfully()
{
_context.CreatedStream.ShouldBeNull();
}
}

// Test context for sharing data between steps
35 changes: 35 additions & 0 deletions foreign/node/src/bdd/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,41 @@ Then(
}
);

When(
'I update the stream name to {string}',
async function (this: TestWorld, newName: string) {
assert.ok(await this.client.stream.update({
streamId: this.stream.id,
name: newName
}));
this.stream = { ...this.stream, name: newName };
}
);

Then(
'the stream name should be updated to {string}',
async function (this: TestWorld, expectedName: string) {
const stream = await this.client.stream.get({ streamId: this.stream.id });
assert.ok(stream, 'Stream should exist after update');
assert.equal(stream!.name, expectedName);
}
);

When(
'I delete the stream with name {string}',
async function (this: TestWorld, name: string) {
assert.ok(await this.client.stream.delete({ streamId: name }));
}
);

Then(
'the stream should be deleted successfully',
async function (this: TestWorld) {
// If we reached here without error, the stream was deleted successfully
assert.ok(true);
}
);

// Cleanup: delete stream after test
Then(
'I can delete stream with ID {int}',
Expand Down
16 changes: 16 additions & 0 deletions foreign/python/apache_iggy.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,22 @@ class IggyClient:

Returns Option of stream details or a PyRuntimeError on failure.
"""
def update_stream(
self, stream_id: builtins.str | builtins.int, name: builtins.str
) -> collections.abc.Awaitable[None]:
r"""
Updates a stream's name.

Returns Ok(()) on successful stream update or a PyRuntimeError on failure.
"""
def delete_stream(
self, stream_id: builtins.str | builtins.int
) -> collections.abc.Awaitable[None]:
r"""
Deletes a stream by id.

Returns Ok(()) on successful stream deletion or a PyRuntimeError on failure.
"""
def create_topic(
self,
stream: builtins.str | builtins.int,
Expand Down
Loading
Loading