|
48 | 48 | <ul>
|
49 | 49 | <li><a class="reference internal" href="#primitive-and-basic-types" id="id4">Primitive and basic types</a></li>
|
50 | 50 | <li><a class="reference internal" href="#json" id="id6">JSON</a></li>
|
51 |
| - <li><a class="reference internal" href="#implementing-custom-serdes" id="id5">Implementing custom serdes</a></li> |
| 51 | + <li><a class="reference internal" href="#window-serdes" id="id7">Window Serdes</a></li> |
| 52 | + <li><a class="reference internal" href="#implementing-custom-serdes" id="id8">Implementing custom serdes</a></li> |
52 | 53 | </ul>
|
53 |
| - <li><a class="reference internal" href="#scala-dsl-serdes" id="id8">Kafka Streams DSL for Scala Implicit Serdes</a></li> |
| 54 | + <li><a class="reference internal" href="#scala-dsl-serdes" id="id9">Kafka Streams DSL for Scala Implicit Serdes</a></li> |
54 | 55 | </ul>
|
55 | 56 | <div class="section" id="configuring-serdes">
|
56 | 57 | <h2>Configuring Serdes<a class="headerlink" href="#configuring-serdes" title="Permalink to this headline"></a></h2>
|
@@ -103,7 +104,7 @@ <h3>Primitive and basic types<a class="headerlink" href="#primitive-and-basic-ty
|
103 | 104 | <pre class="line-numbers"><code class="language-xml"><dependency>
|
104 | 105 | <groupId>org.apache.kafka</groupId>
|
105 | 106 | <artifactId>kafka-clients</artifactId>
|
106 |
| - <version>2.8.0</version> |
| 107 | + <version>{{fullDotVersion}}</version> |
107 | 108 | </dependency></code></pre>
|
108 | 109 | <p>This artifact provides the following serde implementations under the package <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/clients/src/main/java/org/apache/kafka/common/serialization">org.apache.kafka.common.serialization</a>, which you can leverage when e.g., defining default serializers in your Streams configuration.</p>
|
109 | 110 | <table border="1" class="docutils">
|
@@ -163,6 +164,76 @@ <h3>JSON<a class="headerlink" href="#json" title="Permalink to this headline"></
|
163 | 164 | <p>As shown in the example, you can use JSONSerdes inner classes <code class="docutils literal"><span class="pre">Serdes.serdeFrom(<serializerInstance>, <deserializerInstance>)</span></code> to construct JSON compatible serializers and deserializers.
|
164 | 165 | </p>
|
165 | 166 | </div>
|
| 167 | + <div class="section" id="window-serdes"> |
| 168 | + <h3>Window Serdes<a class="headerlink" href="#window-serdes" title="Permalink to this headline"></a></h3> |
| 169 | + <p>Apache Kafka Streams includes serde implementations for windowed types in |
| 170 | + its <code class="docutils literal"><span class="pre">kafka-streams</span></code> Maven artifact:</p> |
| 171 | + <pre class="line-numbers"><code class="language-xml"><dependency> |
| 172 | + <groupId>org.apache.kafka</groupId> |
| 173 | + <artifactId>kafka-streams</artifactId> |
| 174 | + <version>{{fullDotVersion}}</version> |
| 175 | +</dependency></code></pre> |
| 176 | + <p>This artifact provides the following windowed serde implementations under the package <a class="reference external" href="https://github.com/apache/kafka/blob/{{dotVersion}}/streams/src/main/java/org/apache/kafka/streams/kstream">org.apache.kafka.streams.kstream</a>:</p> |
| 177 | + |
| 178 | + <p><strong>Serdes:</strong></p> |
| 179 | + <ul class="simple"> |
| 180 | + <li><code class="docutils literal"><span class="pre">WindowedSerdes.TimeWindowedSerde<T></span></code></li> |
| 181 | + <li><code class="docutils literal"><span class="pre">WindowedSerdes.SessionWindowedSerde<T></span></code></li> |
| 182 | + </ul> |
| 183 | + |
| 184 | + <p><strong>Serializers:</strong></p> |
| 185 | + <ul class="simple"> |
| 186 | + <li><code class="docutils literal"><span class="pre">TimeWindowedSerializer<T></span></code></li> |
| 187 | + <li><code class="docutils literal"><span class="pre">SessionWindowedSerializer<T></span></code></li> |
| 188 | + </ul> |
| 189 | + |
| 190 | + <p><strong>Deserializers:</strong></p> |
| 191 | + <ul class="simple"> |
| 192 | + <li><code class="docutils literal"><span class="pre">TimeWindowedDeserializer<T></span></code></li> |
| 193 | + <li><code class="docutils literal"><span class="pre">SessionWindowedDeserializer<T></span></code></li> |
| 194 | + </ul> |
| 195 | + <h4>Usage in Code</h4> |
| 196 | + <p>When using windowed serdes in your application code, you typically create instances via constructors or factory methods:</p> |
| 197 | + <pre class="line-numbers"><code class="language-java">// Time windowed serde - using factory method |
| 198 | +Serde<Windowed<String>> timeWindowedSerde = |
| 199 | + WindowedSerdes.timeWindowedSerdeFrom(String.class, 500L); |
| 200 | + |
| 201 | +// Time windowed serde - using constructor |
| 202 | +Serde<Windowed<String>> timeWindowedSerde2 = |
| 203 | + new WindowedSerdes.TimeWindowedSerde<>(Serdes.String(), 500L); |
| 204 | + |
| 205 | +// Session windowed serde - using factory method |
| 206 | +Serde<Windowed<String>> sessionWindowedSerde = |
| 207 | + WindowedSerdes.sessionWindowedSerdeFrom(String.class); |
| 208 | + |
| 209 | +// Session windowed serde - using constructor |
| 210 | +Serde<Windowed<String>> sessionWindowedSerde2 = |
| 211 | + new WindowedSerdes.SessionWindowedSerde<>(Serdes.String()); |
| 212 | + |
| 213 | +// Using individual serializers/deserializers |
| 214 | +TimeWindowedSerializer<String> serializer = new TimeWindowedSerializer<>(Serdes.String().serializer()); |
| 215 | +TimeWindowedDeserializer<String> deserializer = new TimeWindowedDeserializer<>(Serdes.String().deserializer(), 500L);</code></pre> |
| 216 | + |
| 217 | + <h4>Usage in Command Line</h4> |
| 218 | + <p>When using command-line tools (like <code>bin/kafka-console-consumer.sh</code>), you can configure windowed deserializers by passing the inner class and window size via configuration properties. The property names use a prefix pattern:</p> |
| 219 | + <pre class="line-numbers"><code class="language-bash"># Time windowed deserializer configuration |
| 220 | +--property print.key=true \ |
| 221 | +--property key.deserializer=org.apache.kafka.streams.kstream.TimeWindowedDeserializer \ |
| 222 | +--property key.deserializer.windowed.inner.deserializer.class=org.apache.kafka.common.serialization.StringDeserializer \ |
| 223 | +--property key.deserializer.window.size.ms=500 |
| 224 | + |
| 225 | +# Session windowed deserializer configuration |
| 226 | +--property print.key=true \ |
| 227 | +--property key.deserializer=org.apache.kafka.streams.kstream.SessionWindowedDeserializer \ |
| 228 | +--property key.deserializer.windowed.inner.deserializer.class=org.apache.kafka.common.serialization.StringDeserializer</code></pre> |
| 229 | + |
| 230 | + <h4>Deprecated Configs</h4> |
| 231 | + <p>The following <code>StreamsConfig</code> parameters are deprecated in favor of passing parameters directly to serializer/deserializer constructors:</p> |
| 232 | + <ul class="simple"> |
| 233 | + <li><code class="docutils literal"><span class="pre">StreamsConfig.WINDOWED_INNER_CLASS_SERDE</span></code> is deprecated in favor of <code class="docutils literal"><span class="pre">TimeWindowedSerializer.WINDOWED_INNER_SERIALIZER_CLASS</span></code> and <code class="docutils literal"><span class="pre">TimeWindowedDeserializer.WINDOWED_INNER_DESERIALIZER_CLASS</span></code></li> |
| 234 | + <li><code class="docutils literal"><span class="pre">StreamsConfig.WINDOW_SIZE_MS_CONFIG</span></code> is deprecated in favor of <code class="docutils literal"><span class="pre">TimeWindowedDeserializer.WINDOW_SIZE_MS_CONFIG</span></code></li> |
| 235 | + </ul> |
| 236 | + </div> |
166 | 237 | <div class="section" id="implementing-custom-serdes">
|
167 | 238 | <span id="streams-developer-guide-serdes-custom"></span><h2>Implementing custom Serdes<a class="headerlink" href="#implementing-custom-serdes" title="Permalink to this headline"></a></h2>
|
168 | 239 | <p>If you need to implement custom Serdes, your best starting point is to take a look at the source code references of
|
|
0 commit comments