Skip to content
This repository was archived by the owner on Dec 13, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<java.version>1.8</java.version>
<maven.surefire.skip>false</maven.surefire.skip>

<r2dbc-spi.version>0.9.0.BUILD-SNAPSHOT</r2dbc-spi.version>
<r2dbc-spi.version>1.0.0.RELEASE</r2dbc-spi.version>
<reactor.version>2020.0.12</reactor.version>
<assertj.version>3.21.0</assertj.version>
<jmh.version>1.33</jmh.version>
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/dev/miku/r2dbc/mysql/Extensions.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import reactor.util.Logger;
import reactor.util.Loggers;

import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -108,9 +107,7 @@ static Extensions from(List<Extension> extensions, boolean autodetect) {
private static List<Extension> autodetect(List<Extension> discovered) {
logger.debug("Discovering Extensions using ServiceLoader");

ServiceLoader<Extension> extensions = AccessController.doPrivileged(
(PrivilegedAction<ServiceLoader<Extension>>) () ->
ServiceLoader.load(Extension.class, Extensions.class.getClassLoader()));
ServiceLoader<Extension> extensions = ServiceLoader.load(Extension.class, Extensions.class.getClassLoader());

for (Extension extension : extensions) {
logger.debug(String.format("Registering extension %s", extension.getClass().getName()));
Expand Down
6 changes: 0 additions & 6 deletions src/main/java/dev/miku/r2dbc/mysql/InsertSyntheticRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;

import static dev.miku.r2dbc.mysql.util.AssertUtils.requireNonNull;

Expand Down Expand Up @@ -121,11 +120,6 @@ public List<ColumnMetadata> getColumnMetadatas() {
return Collections.singletonList(this);
}

@Override
@SuppressWarnings("deprecation")
public Set<String> getColumnNames() {
return nameSet;
}

@Override
public MySqlType getType() {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/dev/miku/r2dbc/mysql/MySqlResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ private MySqlResult(Flux<Segment> segments) {
}

@Override
public Mono<Integer> getRowsUpdated() {
return segments.handle(ROWS_UPDATED).reduce(SUM);
public Mono<Long> getRowsUpdated() {
return segments.handle(ROWS_UPDATED).reduce(SUM).map(i->(long)i);
}

@Override
Expand Down
6 changes: 0 additions & 6 deletions src/main/java/dev/miku/r2dbc/mysql/MySqlRowMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,6 @@ public List<MySqlColumnDescriptor> getColumnMetadatas() {
return InternalArrays.asImmutableList(originMetadata);
}

@SuppressWarnings("deprecation")
@Override
public Set<String> getColumnNames() {
return nameSet;
}

@Override
public String toString() {
return "MySqlRowMetadata{metadata=" + Arrays.toString(originMetadata) + ", sortedNames=" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public final Flux<MySqlResult> execute() {
if (bindings.bindings.isEmpty()) {
throw new IllegalStateException("No parameters bound for current statement");
}
if (bindings.current == null) {
return Flux.error(new IllegalStateException("Trailing add() detected"));
}
bindings.validatedFinish();

return Flux.defer(() -> {
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/dev/miku/r2dbc/mysql/codec/DefaultCodecs.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ private DefaultCodecs(Codec<?>[] codecs) {
List<MassiveParametrizedCodec<?>> massiveParamCodecs = new ArrayList<>();

for (Codec<?> codec : codecs) {
if (codec instanceof UsesCodecs) {
((UsesCodecs)codec).setCodecs(this);
}
if (codec instanceof PrimitiveCodec<?>) {
// Primitive codec must be class-based codec, cannot support ParameterizedType.
PrimitiveCodec<?> c = (PrimitiveCodec<?>) codec;
Expand Down Expand Up @@ -296,7 +299,8 @@ private static Codec<?>[] defaultCodecs(ByteBufAllocator allocator) {
new BlobCodec(allocator),

new ByteBufferCodec(allocator),
new ByteArrayCodec(allocator)
new ByteArrayCodec(allocator),
new ParameterCodec()
};
}

Expand Down
59 changes: 59 additions & 0 deletions src/main/java/dev/miku/r2dbc/mysql/codec/ParameterCodec.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.miku.r2dbc.mysql.codec;

import dev.miku.r2dbc.mysql.MySqlColumnMetadata;
import io.netty.buffer.ByteBuf;
import io.r2dbc.spi.Parameter;

import java.util.BitSet;

/**
* Codec for {@link BitSet}.
*/
final class ParameterCodec implements Codec< Parameter>, UsesCodecs {

private Codecs codecs;

@Override
public Parameter decode(ByteBuf value, MySqlColumnMetadata metadata, Class<?> target, boolean binary, CodecContext context) {
throw new UnsupportedOperationException("cannot decode " + Parameter.class);
}

@Override
public boolean canDecode(MySqlColumnMetadata metadata, Class<?> target) {
return false;
}

@Override
public boolean canEncode(Object value) {
return codecs != null && value instanceof Parameter;
}

@Override
public dev.miku.r2dbc.mysql.Parameter encode(Object value, CodecContext context) {
Parameter p = (Parameter) value;
if (p.getValue() == null) {
return NullParameter.INSTANCE;
}
return codecs.encode(p.getValue(), context);
}

public void setCodecs(Codecs codecs) {
this.codecs = codecs;
}
}
9 changes: 9 additions & 0 deletions src/main/java/dev/miku/r2dbc/mysql/codec/UsesCodecs.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package dev.miku.r2dbc.mysql.codec;

/**
* If a codec implement this interface, it will get a
* reference to {@link Codecs}
*/
public interface UsesCodecs {
void setCodecs(Codecs codecs);
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ void batchCrud() {
.thenMany(updateBatch.execute())
.concatMap(r -> Mono.from(r.getRowsUpdated()))
.collectList()
.doOnNext(updated -> assertThat(updated).isEqualTo(Arrays.asList(2, 3)))
.doOnNext(updated -> assertThat(updated).isEqualTo(Arrays.asList(2L, 3L)))
.thenMany(selectBatch.execute())
.concatMap(result -> result.map((row, metadata) -> row.get("value", String.class)))
.collectList()
Expand All @@ -197,7 +197,7 @@ void batchCrud() {
.thenMany(deleteBatch.execute())
.concatMap(r -> Mono.from(r.getRowsUpdated()))
.collectList()
.doOnNext(deleted -> assertThat(deleted).isEqualTo(Arrays.asList(3, 2)))
.doOnNext(deleted -> assertThat(deleted).isEqualTo(Arrays.asList(3L, 2L)))
.then();
});
}
Expand Down
41 changes: 32 additions & 9 deletions src/test/java/dev/miku/r2dbc/mysql/IntegrationTestSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import io.r2dbc.spi.R2dbcBadGrammarException;
import io.r2dbc.spi.Result;
import org.reactivestreams.Publisher;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testcontainers.utility.DockerImageName;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
Expand All @@ -29,13 +32,15 @@
import java.util.function.Function;
import java.util.function.Predicate;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testcontainers.containers.MySQLContainer.MYSQL_PORT;

/**
* Base class considers connection factory and general function for integration tests.
*/

abstract class IntegrationTestSupport {

static MySQLContainer<?> mySQLContainer;
private final MySqlConnectionFactory connectionFactory;

IntegrationTestSupport(MySqlConnectionConfiguration configuration) {
Expand Down Expand Up @@ -63,24 +68,42 @@ private StepVerifier.FirstStep<Void> process(Function<? super MySqlConnection, P
.as(StepVerifier::create);
}

static Mono<Integer> extractRowsUpdated(Result result) {
static Mono<Long> extractRowsUpdated(Result result) {
return Mono.from(result.getRowsUpdated());
}

static MySqlConnectionConfiguration configuration(boolean autodetectExtensions,
@Nullable ZoneId serverZoneId, @Nullable Predicate<String> preferPrepared) {
String user = "root";
String password = System.getProperty("test.mysql.password");

assertThat(password).withFailMessage("Property test.mysql.password must exists and not be empty")
.isNotNull()
.isNotEmpty();
String databaseName = "r2dbc";
String host = "127.0.0.1";
int port = MYSQL_PORT;

// it seems this is used in the CI for now
// otherwise we could use an annotation based approach
if (password == null && mySQLContainer == null) {
mySQLContainer= new MySQLContainer<>(DockerImageName.parse("mysql").withTag("5.7.34"))
.withDatabaseName(databaseName)
.withUsername(user);
mySQLContainer.start();
Awaitility.await()
.atMost(Duration.ofSeconds(30))
.until(mySQLContainer::isRunning);
}
if (mySQLContainer !=null) {
password = mySQLContainer.getPassword();
host = mySQLContainer.getHost();
port = mySQLContainer.getMappedPort(MYSQL_PORT);
}

MySqlConnectionConfiguration.Builder builder = MySqlConnectionConfiguration.builder()
.host("127.0.0.1")
.host(host)
.port(port)
.connectTimeout(Duration.ofSeconds(3))
.user("root")
.user(user)
.password(password)
.database("r2dbc")
.database(databaseName)
.autodetectExtensions(autodetectExtensions);

if (serverZoneId != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,12 @@ void json() {
private static Publisher<MySqlResult> insert(MySqlConnection connection) {
MySqlStatement statement = connection.createStatement("INSERT INTO test VALUES (DEFAULT, ?)");


for (Bar bar : BARS) {
statement.bind(0, bar).add();
MySqlStatement bind = statement.bind(0, bar);
if (bar!=BARS[BARS.length-1]) {
bind.add();
}
}

return statement.execute();
Expand Down