diff --git a/.gitignore b/.gitignore index 1cb16fa..4fbfbd1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .dub +dub.selections.json hb-ddb-test-library libhb-ddb.a docs.json diff --git a/dub.selections.json b/dub.selections.json deleted file mode 100644 index 53c5aa4..0000000 --- a/dub.selections.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "fileVersion": 1, - "versions": { - "eventcore": "0.8.3+commit.1.gc242fda", - "libasync": "0.8.2", - "libevent": "2.0.2+2.0.16", - "memutils": "0.4.8", - "taggedalgebraic": "0.10.5", - "vibe-core": "1.0.0" - } -} diff --git a/source/ddb/pg/command.d b/source/ddb/pg/command.d index e4a34d0..a31f071 100644 --- a/source/ddb/pg/command.d +++ b/source/ddb/pg/command.d @@ -10,6 +10,7 @@ import ddb.pg.connection : PGConnection; import ddb.pg.parameters : PGParameter, PGParameters; import ddb.pg.resultset : PGResultSet; import ddb.pg.types; +import ddb.db: DBRow; @safe: @@ -100,6 +101,17 @@ class PGCommand params.changed = true; } + void ensure_uprepared() + { + if (prepared) + { + conn.unprepare(preparedName); + preparedName = ""; + prepared = false; + params.changed = true; + } + } + /** Binds values to parameters and updates list of returned fields. @@ -185,7 +197,7 @@ class PGCommand */ DBRow!Specs executeRow(Specs...)(bool throwIfMoreRows = true) { - auto result = query!Specs(); + auto result = conn.query!Specs(); scope(exit) result.close(); enforce(!result.empty(), "Result doesn't contain any rows."); auto row = result.front(); @@ -215,7 +227,7 @@ class PGCommand */ T executeScalar(T = Variant)(bool throwIfMoreRows = true) { - auto result = query!T(); + auto result = conn.query!T(); scope(exit) result.close(); enforce(!result.empty(), "Result doesn't contain any rows."); T row = result.front(); @@ -229,5 +241,3 @@ class PGCommand alias scalar = executeScalar; } - - diff --git a/source/ddb/pg/connection.d b/source/ddb/pg/connection.d index 42b9fd7..5106c55 100644 --- a/source/ddb/pg/connection.d +++ b/source/ddb/pg/connection.d @@ -8,6 +8,8 @@ import std.conv : text, to; import std.exception : enforce; import std.datetime : Clock, Date, DateTime, UTC; import std.string : indexOf, lastIndexOf; +import std.range: isRandomAccessRange, ElementType; +import std.traits: Unqual; import ddb.db : DBRow; import ddb.pg.exceptions; @@ -67,11 +69,7 @@ class PGConnection // (for the current connection) string reservePrepared() { - synchronized (this) - { - - return to!string(lastPrepared++); - } + return to!string(lastPrepared++); } package(ddb.pg) Message getMessage() @@ -140,7 +138,8 @@ class PGConnection stream.writeCString(password); } - void sendParseMessage(string statementName, string query, int[] oids) + void sendParseMessage(R)(string statementName, string query, scope R oids) + if (isRandomAccessRange!(Unqual!R) && is(Unqual!(ElementType!R) == PGType)) { int len = cast(int)(4 + statementName.length + 1 + query.length + 1 + 2 + oids.length * 4); @@ -173,25 +172,33 @@ class PGConnection bool hasText = false; int paramsLen = params.calcLen(hasText); - int len = cast(int)( 4 + portalName.length + 1 + statementName.length + 1 + (hasText ? (params.length*2) : 2) + 2 + 2 + - params.length * 4 + paramsLen + 2 + 2 ); + int len = cast(int)( + 4 + // length of the message + portalName.length + 1 + // length of destination portal name + null terminator + statementName.length + 1 + // length of prepared statement name + null terminator + // next we write parameter formats. If all data is binary, + // we simply put int16(1) + int16(1) bytes, if there is + // text mixed in, we write int16(0) + int16(format) for each parameter + 2 + (hasText ? (params.length * 2) : 2) + + 2 + // 2 bytes for int16(parameter length) + paramsLen + // length of all parameter tuples (length, body) + 2 + 2 // int16 number result columns and int16 result-column format code (only one) + ); stream.write(PGRequestMessageTypes.Bind); stream.write(len); stream.writeCString(portalName); stream.writeCString(statementName); - if(hasText) + if (hasText) { - stream.write(cast(short) params.length); + stream.write(cast(short)params.length); foreach(param; params) { with (PGType) switch (param.type) { - case BOOLEAN: case TIMESTAMP: case INET: - case NUMERIC: case JSONB: case INTERVAL: case VARCHAR: @@ -201,8 +208,10 @@ class PGConnection default: stream.write(cast(short) 1); // binary format } + } } - } else { + else + { stream.write(cast(short)1); // one parameter format code stream.write(cast(short)1); // binary format } @@ -519,7 +528,7 @@ class PGConnection assert(0); } - void handleAsync(scope ref Message msg) + void handleAsync(ref Message msg) { with (PGResponseMessageTypes) switch (msg.type) @@ -898,6 +907,14 @@ class PGConnection execute("BEGIN TRANSACTION ISOLATION LEVEL " ~ mode.level ~ " " ~ mode.rwMode ~ ";"); } + /** + Starts transaction of default type. Small benefit in bandwidth. + */ + void begin_default() + { + execute("BEGIN;"); + } + /** Commits a transaction. Throws: CommitTransactionException if the commit failed. diff --git a/source/ddb/pg/parameters.d b/source/ddb/pg/parameters.d index 133a0cc..5f1e214 100644 --- a/source/ddb/pg/parameters.d +++ b/source/ddb/pg/parameters.d @@ -2,6 +2,8 @@ module ddb.pg.parameters; import std.algorithm.sorting : sort; +import std.algorithm.iteration: map; +import std.conv : to; import std.variant : Variant; import ddb.pg.stream : PGStream; @@ -25,7 +27,7 @@ class PGParameter return _value; } /// ditto - @property Variant value(T)(T v) + @property Variant value(T)(T v) @trusted { params.changed = true; return _value = Variant(v); @@ -33,7 +35,7 @@ class PGParameter package(ddb) this(PGParameters params, short index, PGType type) { - enforce(index > 0, new ParamException("Parameter's index must be > 0")); + enforce!ParamException(index > 0, "Parameter's index must be > 0"); this.params = params; this.index = index; this.type = type; @@ -43,23 +45,13 @@ class PGParameter /// Collection of query parameters class PGParameters { - private PGParameter[short] params; + private PGParameter[] params; private PGCommand cmd; package(ddb) bool changed; - package(ddb) int[] getOids() + package(ddb) auto getOids() { - short[] keys = () @trusted { return params.keys; }(); - sort(keys); - - int[] oids = new int[params.length]; - - foreach (int i, key; keys) - { - oids[i] = params[key].type; - } - - return oids; + return map!(a => a.type)(params); } /// @@ -85,42 +77,41 @@ class PGParameters assert(cmd.executeNonQuery == 1); --- */ - PGParameter add(short index, PGType type) + PGParameter add(int index, PGType type) { enforce(!cmd.prepared, "Can't add parameter to prepared statement."); changed = true; - return params[index] = new PGParameter(this, index, type); + enforce!ParamException(index == params.length + 1, "Add parameters sequentually, 1, then 2, then 3..."); + params ~= new PGParameter(this, to!short(index), type); + return params[index - 1]; } - PGParameters bind(T)(short index, PGType type, T value) + PGParameters bind(T)(int index, PGType type, T value) { enforce(!cmd.prepared, "Can't add parameter to prepared statement."); changed = true; - params[index] = new PGParameter(this, index, type); - params[index].value = value; + enforce!ParamException(index == params.length + 1, "Add parameters sequentually, 1, then 2, then 3..."); + params ~= new PGParameter(this, to!short(index), type); + params[index - 1].value = value; return this; } - // todo: remove() - PGParameter opIndex(short index) + PGParameter opIndex(int index) { - return params[index]; + return params[index - 1]; } int opApply(int delegate(ref PGParameter param) @safe dg) { int result = 0; - - foreach (number; sort(() @trusted { return params.keys; }())) + foreach (p; params) { - result = dg(params[number]); - + result = dg(p); if (result) break; } - return result; } @@ -139,17 +130,19 @@ class PGParameters { if (param.value != null) { - enforce(param.value.convertsTo!T, new ParamException("Parameter's value is not convertible to " ~ T.stringof)); + enforce(param.value.convertsTo!T, new ParamException( + "Parameter's value of type " ~ param.value.type.toString ~ + " is not convertible to " ~ T.stringof)); paramsLen += len; } } + paramsLen += 4; // all param values are preceded by 4-byte size header + with (PGType) /*final*/ switch (param.type) { - case BOOLEAN: - checkParam!bool(1); - break; + case BOOLEAN: checkParam!bool(1); break; case INT2: checkParam!short(2); break; case INT4: checkParam!int(4); break; case INT8: checkParam!long(8); break; @@ -193,7 +186,7 @@ class PGParameters { if (param.value == null) { - write(-1); + write(-1); // length to -1 as scpecial case, no value is written continue; } @@ -201,7 +194,7 @@ class PGParameters switch (param.type) { case BOOLEAN: - write(cast(bool) 1); + write(cast(int)1); write(param.value.get!bool); break; case INT2: diff --git a/source/ddb/pg/parsers.d b/source/ddb/pg/parsers.d index c7470bc..81a4729 100644 --- a/source/ddb/pg/parsers.d +++ b/source/ddb/pg/parsers.d @@ -14,7 +14,7 @@ Parses incoming responses from the Postgres Server @safe: -PGFields parseRowDescription(scope ref Message msg) +PGFields parseRowDescription(ref Message msg) { PGField[] fields; short fieldCount; @@ -45,8 +45,8 @@ PGFields parseRowDescription(scope ref Message msg) return () @trusted { return cast(PGFields)fields; }(); } -auto parseDataRow(Result)(scope ref Message msg, Result result, - scope ref PGFields fields, PGConnection conn) +auto parseDataRow(Result)(ref Message msg, Result result, + ref PGFields fields, PGConnection conn) { alias Row = Result.Row; result.row = conn.fetchRow!(Result._Specs)(msg, fields); @@ -59,7 +59,7 @@ auto parseDataRow(Result)(scope ref Message msg, Result result, return result; } -void parseReadyForQuery(scope ref Message msg, PGConnection conn) +void parseReadyForQuery(ref Message msg, PGConnection conn) @trusted { enforce(msg.data.length == 1); @@ -74,7 +74,7 @@ void parseReadyForQuery(scope ref Message msg, PGConnection conn) } } -void parseCommandCompletion(scope ref Message msg, PGConnection conn, out uint oid, ref ulong rowsAffected) +void parseCommandCompletion(ref Message msg, PGConnection conn, out uint oid, ref ulong rowsAffected) { import std.string : indexOf, lastIndexOf; diff --git a/source/ddb/pg/stream.d b/source/ddb/pg/stream.d index fa43627..1ca29ce 100644 --- a/source/ddb/pg/stream.d +++ b/source/ddb/pg/stream.d @@ -86,6 +86,11 @@ class PGStream } } + void write(bool x) + { + write(nativeToBigEndian(x)); // ubyte[] + } + void write(ubyte x) { write(nativeToBigEndian(x)); // ubyte[]