|
| 1 | +package com.slack.api.methods; |
| 2 | + |
| 3 | +import com.slack.api.methods.request.chat.ChatAppendStreamRequest; |
| 4 | +import com.slack.api.methods.request.chat.ChatStartStreamRequest; |
| 5 | +import com.slack.api.methods.request.chat.ChatStopStreamRequest; |
| 6 | +import com.slack.api.methods.response.chat.ChatAppendStreamResponse; |
| 7 | +import com.slack.api.methods.response.chat.ChatStartStreamResponse; |
| 8 | +import com.slack.api.methods.response.chat.ChatStopStreamResponse; |
| 9 | +import com.slack.api.model.Message; |
| 10 | +import com.slack.api.model.block.LayoutBlock; |
| 11 | +import lombok.Builder; |
| 12 | +import lombok.Data; |
| 13 | +import lombok.extern.slf4j.Slf4j; |
| 14 | + |
| 15 | +import java.util.List; |
| 16 | +import java.util.concurrent.CompletableFuture; |
| 17 | + |
| 18 | +/** |
| 19 | + * Async variant of {@link ChatStreamHelper} for {@link AsyncMethodsClient}. |
| 20 | + * <p> |
| 21 | + * This helper buffers markdown text and flushes via chat.startStream / chat.appendStream, then finalizes via |
| 22 | + * chat.stopStream. |
| 23 | + * <p> |
| 24 | + * |
| 25 | + */ |
| 26 | +@Data |
| 27 | +@Slf4j |
| 28 | +@Builder |
| 29 | +public class AsyncChatStreamHelper { |
| 30 | + |
| 31 | + public enum State { |
| 32 | + STARTING, |
| 33 | + IN_PROGRESS, |
| 34 | + COMPLETED |
| 35 | + } |
| 36 | + |
| 37 | + private final AsyncMethodsClient client; |
| 38 | + private final String channel; |
| 39 | + private final String threadTs; |
| 40 | + private final String recipientTeamId; |
| 41 | + private final String recipientUserId; |
| 42 | + |
| 43 | + @Builder.Default |
| 44 | + private final int bufferSize = 100; |
| 45 | + |
| 46 | + @Builder.Default |
| 47 | + private StringBuilder buffer = new StringBuilder(); |
| 48 | + @Builder.Default |
| 49 | + private State state = State.STARTING; |
| 50 | + private String streamTs; |
| 51 | + |
| 52 | + /** |
| 53 | + * Append text to the stream. |
| 54 | + * |
| 55 | + * @param markdownText markdown text to append |
| 56 | + * @return a future that completes with a response if the buffer was flushed; completes with null if buffering |
| 57 | + */ |
| 58 | + public CompletableFuture<ChatAppendStreamResponse> append(String markdownText) { |
| 59 | + if (state == State.COMPLETED) { |
| 60 | + CompletableFuture<ChatAppendStreamResponse> f = new CompletableFuture<>(); |
| 61 | + f.completeExceptionally(new SlackChatStreamException("Cannot append to stream: stream state is " + state)); |
| 62 | + return f; |
| 63 | + } |
| 64 | + |
| 65 | + buffer.append(markdownText); |
| 66 | + |
| 67 | + if (buffer.length() >= bufferSize) { |
| 68 | + return flushBuffer(); |
| 69 | + } |
| 70 | + |
| 71 | + if (log.isDebugEnabled()) { |
| 72 | + log.debug("AsyncChatStream appended to buffer: bufferLength={}, bufferSize={}, channel={}, " + |
| 73 | + "recipientTeamId={}, recipientUserId={}, threadTs={}", |
| 74 | + buffer.length(), bufferSize, channel, recipientTeamId, recipientUserId, threadTs); |
| 75 | + } |
| 76 | + return CompletableFuture.completedFuture(null); |
| 77 | + } |
| 78 | + |
| 79 | + public CompletableFuture<ChatStopStreamResponse> stop() { |
| 80 | + return stop(null, null, null); |
| 81 | + } |
| 82 | + |
| 83 | + public CompletableFuture<ChatStopStreamResponse> stop(String markdownText) { |
| 84 | + return stop(markdownText, null, null); |
| 85 | + } |
| 86 | + |
| 87 | + public CompletableFuture<ChatStopStreamResponse> stop( |
| 88 | + String markdownText, |
| 89 | + List<LayoutBlock> blocks, |
| 90 | + Message.Metadata metadata |
| 91 | + ) { |
| 92 | + if (state == State.COMPLETED) { |
| 93 | + CompletableFuture<ChatStopStreamResponse> f = new CompletableFuture<>(); |
| 94 | + f.completeExceptionally(new SlackChatStreamException("Cannot stop stream: stream state is " + state)); |
| 95 | + return f; |
| 96 | + } |
| 97 | + |
| 98 | + if (markdownText != null) { |
| 99 | + buffer.append(markdownText); |
| 100 | + } |
| 101 | + |
| 102 | + CompletableFuture<Void> ensureStarted; |
| 103 | + if (streamTs == null) { |
| 104 | + ensureStarted = client.chatStartStream(ChatStartStreamRequest.builder() |
| 105 | + .channel(channel) |
| 106 | + .threadTs(threadTs) |
| 107 | + .recipientTeamId(recipientTeamId) |
| 108 | + .recipientUserId(recipientUserId) |
| 109 | + .build()) |
| 110 | + .thenApply(startResponse -> { |
| 111 | + if (!startResponse.isOk() || startResponse.getTs() == null) { |
| 112 | + SlackChatStreamException ex = new SlackChatStreamException( |
| 113 | + "Failed to stop stream: stream not started - " + startResponse.getError()); |
| 114 | + ex.setStartResponse(startResponse); |
| 115 | + throw ex; |
| 116 | + } |
| 117 | + streamTs = startResponse.getTs(); |
| 118 | + state = State.IN_PROGRESS; |
| 119 | + return null; |
| 120 | + }); |
| 121 | + } else { |
| 122 | + ensureStarted = CompletableFuture.completedFuture(null); |
| 123 | + } |
| 124 | + |
| 125 | + return ensureStarted.thenCompose(ignored -> client.chatStopStream(ChatStopStreamRequest.builder() |
| 126 | + .channel(channel) |
| 127 | + .ts(streamTs) |
| 128 | + .markdownText(buffer.toString()) |
| 129 | + .blocks(blocks) |
| 130 | + .metadata(metadata) |
| 131 | + .build()) |
| 132 | + .thenApply(resp -> { |
| 133 | + state = State.COMPLETED; |
| 134 | + return resp; |
| 135 | + })); |
| 136 | + } |
| 137 | + |
| 138 | + private CompletableFuture<ChatAppendStreamResponse> flushBuffer() { |
| 139 | + if (streamTs == null) { |
| 140 | + return client.chatStartStream(ChatStartStreamRequest.builder() |
| 141 | + .channel(channel) |
| 142 | + .threadTs(threadTs) |
| 143 | + .recipientTeamId(recipientTeamId) |
| 144 | + .recipientUserId(recipientUserId) |
| 145 | + .markdownText(buffer.toString()) |
| 146 | + .build()) |
| 147 | + .thenApply(startResponse -> { |
| 148 | + if (!startResponse.isOk()) { |
| 149 | + SlackChatStreamException ex = new SlackChatStreamException( |
| 150 | + "Failed to start stream: " + startResponse.getError()); |
| 151 | + ex.setStartResponse(startResponse); |
| 152 | + throw ex; |
| 153 | + } |
| 154 | + streamTs = startResponse.getTs(); |
| 155 | + state = State.IN_PROGRESS; |
| 156 | + ChatAppendStreamResponse synth = new ChatAppendStreamResponse(); |
| 157 | + synth.setOk(startResponse.isOk()); |
| 158 | + synth.setChannel(startResponse.getChannel()); |
| 159 | + synth.setTs(startResponse.getTs()); |
| 160 | + synth.setWarning(startResponse.getWarning()); |
| 161 | + synth.setError(startResponse.getError()); |
| 162 | + buffer.setLength(0); |
| 163 | + return synth; |
| 164 | + }); |
| 165 | + } else { |
| 166 | + return client.chatAppendStream(ChatAppendStreamRequest.builder() |
| 167 | + .channel(channel) |
| 168 | + .ts(streamTs) |
| 169 | + .markdownText(buffer.toString()) |
| 170 | + .build()) |
| 171 | + .thenApply(resp -> { |
| 172 | + if (!resp.isOk()) { |
| 173 | + SlackChatStreamException ex = new SlackChatStreamException( |
| 174 | + "Failed to append to stream: " + resp.getError()); |
| 175 | + ex.getAppendResponses().add(resp); |
| 176 | + throw ex; |
| 177 | + } |
| 178 | + buffer.setLength(0); |
| 179 | + return resp; |
| 180 | + }); |
| 181 | + } |
| 182 | + } |
| 183 | +} |
| 184 | + |
| 185 | + |
0 commit comments