diff --git a/pom.xml b/pom.xml
index 8203fdb8..22ed5698 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,7 +61,7 @@
1.8
false
- 0.9.0.BUILD-SNAPSHOT
+ 1.0.0.RELEASE
2020.0.12
3.21.0
1.33
diff --git a/src/main/java/dev/miku/r2dbc/mysql/Extensions.java b/src/main/java/dev/miku/r2dbc/mysql/Extensions.java
index d04acbc7..06cbb8fe 100644
--- a/src/main/java/dev/miku/r2dbc/mysql/Extensions.java
+++ b/src/main/java/dev/miku/r2dbc/mysql/Extensions.java
@@ -20,7 +20,6 @@
import reactor.util.Logger;
import reactor.util.Loggers;
-import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
@@ -108,9 +107,7 @@ static Extensions from(List extensions, boolean autodetect) {
private static List autodetect(List discovered) {
logger.debug("Discovering Extensions using ServiceLoader");
- ServiceLoader extensions = AccessController.doPrivileged(
- (PrivilegedAction>) () ->
- ServiceLoader.load(Extension.class, Extensions.class.getClassLoader()));
+ ServiceLoader extensions = ServiceLoader.load(Extension.class, Extensions.class.getClassLoader());
for (Extension extension : extensions) {
logger.debug(String.format("Registering extension %s", extension.getClass().getName()));
diff --git a/src/main/java/dev/miku/r2dbc/mysql/InsertSyntheticRow.java b/src/main/java/dev/miku/r2dbc/mysql/InsertSyntheticRow.java
index 2415075b..5077f2ee 100644
--- a/src/main/java/dev/miku/r2dbc/mysql/InsertSyntheticRow.java
+++ b/src/main/java/dev/miku/r2dbc/mysql/InsertSyntheticRow.java
@@ -26,7 +26,6 @@
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
-import java.util.Set;
import static dev.miku.r2dbc.mysql.util.AssertUtils.requireNonNull;
@@ -121,11 +120,6 @@ public List getColumnMetadatas() {
return Collections.singletonList(this);
}
- @Override
- @SuppressWarnings("deprecation")
- public Set getColumnNames() {
- return nameSet;
- }
@Override
public MySqlType getType() {
diff --git a/src/main/java/dev/miku/r2dbc/mysql/MySqlResult.java b/src/main/java/dev/miku/r2dbc/mysql/MySqlResult.java
index 72bc4b1e..fe82f286 100644
--- a/src/main/java/dev/miku/r2dbc/mysql/MySqlResult.java
+++ b/src/main/java/dev/miku/r2dbc/mysql/MySqlResult.java
@@ -78,8 +78,8 @@ private MySqlResult(Flux segments) {
}
@Override
- public Mono getRowsUpdated() {
- return segments.handle(ROWS_UPDATED).reduce(SUM);
+ public Mono getRowsUpdated() {
+ return segments.handle(ROWS_UPDATED).reduce(SUM).map(i->(long)i);
}
@Override
diff --git a/src/main/java/dev/miku/r2dbc/mysql/MySqlRowMetadata.java b/src/main/java/dev/miku/r2dbc/mysql/MySqlRowMetadata.java
index 72d96f5f..a49f4bcc 100644
--- a/src/main/java/dev/miku/r2dbc/mysql/MySqlRowMetadata.java
+++ b/src/main/java/dev/miku/r2dbc/mysql/MySqlRowMetadata.java
@@ -104,12 +104,6 @@ public List getColumnMetadatas() {
return InternalArrays.asImmutableList(originMetadata);
}
- @SuppressWarnings("deprecation")
- @Override
- public Set getColumnNames() {
- return nameSet;
- }
-
@Override
public String toString() {
return "MySqlRowMetadata{metadata=" + Arrays.toString(originMetadata) + ", sortedNames=" +
diff --git a/src/main/java/dev/miku/r2dbc/mysql/ParametrizedStatementSupport.java b/src/main/java/dev/miku/r2dbc/mysql/ParametrizedStatementSupport.java
index f8c3c0e1..d3d68e01 100644
--- a/src/main/java/dev/miku/r2dbc/mysql/ParametrizedStatementSupport.java
+++ b/src/main/java/dev/miku/r2dbc/mysql/ParametrizedStatementSupport.java
@@ -111,6 +111,9 @@ public final Flux execute() {
if (bindings.bindings.isEmpty()) {
throw new IllegalStateException("No parameters bound for current statement");
}
+ if (bindings.current == null) {
+ return Flux.error(new IllegalStateException("Trailing add() detected"));
+ }
bindings.validatedFinish();
return Flux.defer(() -> {
diff --git a/src/main/java/dev/miku/r2dbc/mysql/codec/DefaultCodecs.java b/src/main/java/dev/miku/r2dbc/mysql/codec/DefaultCodecs.java
index f4e475f4..45a66ddd 100644
--- a/src/main/java/dev/miku/r2dbc/mysql/codec/DefaultCodecs.java
+++ b/src/main/java/dev/miku/r2dbc/mysql/codec/DefaultCodecs.java
@@ -59,6 +59,9 @@ private DefaultCodecs(Codec>[] codecs) {
List> massiveParamCodecs = new ArrayList<>();
for (Codec> codec : codecs) {
+ if (codec instanceof UsesCodecs) {
+ ((UsesCodecs)codec).setCodecs(this);
+ }
if (codec instanceof PrimitiveCodec>) {
// Primitive codec must be class-based codec, cannot support ParameterizedType.
PrimitiveCodec> c = (PrimitiveCodec>) codec;
@@ -296,7 +299,8 @@ private static Codec>[] defaultCodecs(ByteBufAllocator allocator) {
new BlobCodec(allocator),
new ByteBufferCodec(allocator),
- new ByteArrayCodec(allocator)
+ new ByteArrayCodec(allocator),
+ new ParameterCodec()
};
}
diff --git a/src/main/java/dev/miku/r2dbc/mysql/codec/ParameterCodec.java b/src/main/java/dev/miku/r2dbc/mysql/codec/ParameterCodec.java
new file mode 100644
index 00000000..a601f7af
--- /dev/null
+++ b/src/main/java/dev/miku/r2dbc/mysql/codec/ParameterCodec.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2018-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.miku.r2dbc.mysql.codec;
+
+import dev.miku.r2dbc.mysql.MySqlColumnMetadata;
+import io.netty.buffer.ByteBuf;
+import io.r2dbc.spi.Parameter;
+
+import java.util.BitSet;
+
+/**
+ * Codec for {@link BitSet}.
+ */
+final class ParameterCodec implements Codec< Parameter>, UsesCodecs {
+
+ private Codecs codecs;
+
+ @Override
+ public Parameter decode(ByteBuf value, MySqlColumnMetadata metadata, Class> target, boolean binary, CodecContext context) {
+ throw new UnsupportedOperationException("cannot decode " + Parameter.class);
+ }
+
+ @Override
+ public boolean canDecode(MySqlColumnMetadata metadata, Class> target) {
+ return false;
+ }
+
+ @Override
+ public boolean canEncode(Object value) {
+ return codecs != null && value instanceof Parameter;
+ }
+
+ @Override
+ public dev.miku.r2dbc.mysql.Parameter encode(Object value, CodecContext context) {
+ Parameter p = (Parameter) value;
+ if (p.getValue() == null) {
+ return NullParameter.INSTANCE;
+ }
+ return codecs.encode(p.getValue(), context);
+ }
+
+ public void setCodecs(Codecs codecs) {
+ this.codecs = codecs;
+ }
+}
diff --git a/src/main/java/dev/miku/r2dbc/mysql/codec/UsesCodecs.java b/src/main/java/dev/miku/r2dbc/mysql/codec/UsesCodecs.java
new file mode 100644
index 00000000..1fb98a8c
--- /dev/null
+++ b/src/main/java/dev/miku/r2dbc/mysql/codec/UsesCodecs.java
@@ -0,0 +1,9 @@
+package dev.miku.r2dbc.mysql.codec;
+
+/**
+ * If a codec implement this interface, it will get a
+ * reference to {@link Codecs}
+ */
+public interface UsesCodecs {
+ void setCodecs(Codecs codecs);
+}
diff --git a/src/test/java/dev/miku/r2dbc/mysql/ConnectionIntegrationTest.java b/src/test/java/dev/miku/r2dbc/mysql/ConnectionIntegrationTest.java
index 5db6ad38..b0937806 100644
--- a/src/test/java/dev/miku/r2dbc/mysql/ConnectionIntegrationTest.java
+++ b/src/test/java/dev/miku/r2dbc/mysql/ConnectionIntegrationTest.java
@@ -188,7 +188,7 @@ void batchCrud() {
.thenMany(updateBatch.execute())
.concatMap(r -> Mono.from(r.getRowsUpdated()))
.collectList()
- .doOnNext(updated -> assertThat(updated).isEqualTo(Arrays.asList(2, 3)))
+ .doOnNext(updated -> assertThat(updated).isEqualTo(Arrays.asList(2L, 3L)))
.thenMany(selectBatch.execute())
.concatMap(result -> result.map((row, metadata) -> row.get("value", String.class)))
.collectList()
@@ -197,7 +197,7 @@ void batchCrud() {
.thenMany(deleteBatch.execute())
.concatMap(r -> Mono.from(r.getRowsUpdated()))
.collectList()
- .doOnNext(deleted -> assertThat(deleted).isEqualTo(Arrays.asList(3, 2)))
+ .doOnNext(deleted -> assertThat(deleted).isEqualTo(Arrays.asList(3L, 2L)))
.then();
});
}
diff --git a/src/test/java/dev/miku/r2dbc/mysql/IntegrationTestSupport.java b/src/test/java/dev/miku/r2dbc/mysql/IntegrationTestSupport.java
index 4f4b26a1..9fe9f02c 100644
--- a/src/test/java/dev/miku/r2dbc/mysql/IntegrationTestSupport.java
+++ b/src/test/java/dev/miku/r2dbc/mysql/IntegrationTestSupport.java
@@ -19,6 +19,9 @@
import io.r2dbc.spi.R2dbcBadGrammarException;
import io.r2dbc.spi.Result;
import org.reactivestreams.Publisher;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -29,13 +32,15 @@
import java.util.function.Function;
import java.util.function.Predicate;
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.testcontainers.containers.MySQLContainer.MYSQL_PORT;
/**
* Base class considers connection factory and general function for integration tests.
*/
+
abstract class IntegrationTestSupport {
+ static MySQLContainer> mySQLContainer;
private final MySqlConnectionFactory connectionFactory;
IntegrationTestSupport(MySqlConnectionConfiguration configuration) {
@@ -63,24 +68,42 @@ private StepVerifier.FirstStep process(Function super MySqlConnection, P
.as(StepVerifier::create);
}
- static Mono extractRowsUpdated(Result result) {
+ static Mono extractRowsUpdated(Result result) {
return Mono.from(result.getRowsUpdated());
}
static MySqlConnectionConfiguration configuration(boolean autodetectExtensions,
@Nullable ZoneId serverZoneId, @Nullable Predicate preferPrepared) {
+ String user = "root";
String password = System.getProperty("test.mysql.password");
-
- assertThat(password).withFailMessage("Property test.mysql.password must exists and not be empty")
- .isNotNull()
- .isNotEmpty();
+ String databaseName = "r2dbc";
+ String host = "127.0.0.1";
+ int port = MYSQL_PORT;
+
+ // it seems this is used in the CI for now
+ // otherwise we could use an annotation based approach
+ if (password == null && mySQLContainer == null) {
+ mySQLContainer= new MySQLContainer<>(DockerImageName.parse("mysql").withTag("5.7.34"))
+ .withDatabaseName(databaseName)
+ .withUsername(user);
+ mySQLContainer.start();
+ Awaitility.await()
+ .atMost(Duration.ofSeconds(30))
+ .until(mySQLContainer::isRunning);
+ }
+ if (mySQLContainer !=null) {
+ password = mySQLContainer.getPassword();
+ host = mySQLContainer.getHost();
+ port = mySQLContainer.getMappedPort(MYSQL_PORT);
+ }
MySqlConnectionConfiguration.Builder builder = MySqlConnectionConfiguration.builder()
- .host("127.0.0.1")
+ .host(host)
+ .port(port)
.connectTimeout(Duration.ofSeconds(3))
- .user("root")
+ .user(user)
.password(password)
- .database("r2dbc")
+ .database(databaseName)
.autodetectExtensions(autodetectExtensions);
if (serverZoneId != null) {
diff --git a/src/test/java/dev/miku/r2dbc/mysql/JacksonIntegrationTestSupport.java b/src/test/java/dev/miku/r2dbc/mysql/JacksonIntegrationTestSupport.java
index 498800aa..2abd631f 100644
--- a/src/test/java/dev/miku/r2dbc/mysql/JacksonIntegrationTestSupport.java
+++ b/src/test/java/dev/miku/r2dbc/mysql/JacksonIntegrationTestSupport.java
@@ -84,8 +84,12 @@ void json() {
private static Publisher insert(MySqlConnection connection) {
MySqlStatement statement = connection.createStatement("INSERT INTO test VALUES (DEFAULT, ?)");
+
for (Bar bar : BARS) {
- statement.bind(0, bar).add();
+ MySqlStatement bind = statement.bind(0, bar);
+ if (bar!=BARS[BARS.length-1]) {
+ bind.add();
+ }
}
return statement.execute();