Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.dub
dub.selections.json
hb-ddb-test-library
libhb-ddb.a
docs.json
Expand Down
11 changes: 0 additions & 11 deletions dub.selections.json

This file was deleted.

18 changes: 14 additions & 4 deletions source/ddb/pg/command.d
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import ddb.pg.connection : PGConnection;
import ddb.pg.parameters : PGParameter, PGParameters;
import ddb.pg.resultset : PGResultSet;
import ddb.pg.types;
import ddb.db: DBRow;

@safe:

Expand Down Expand Up @@ -100,6 +101,17 @@ class PGCommand
params.changed = true;
}

void ensure_uprepared()
{
if (prepared)
{
conn.unprepare(preparedName);
preparedName = "";
prepared = false;
params.changed = true;
}
}

/**
Binds values to parameters and updates list of returned fields.

Expand Down Expand Up @@ -185,7 +197,7 @@ class PGCommand
*/
DBRow!Specs executeRow(Specs...)(bool throwIfMoreRows = true)
{
auto result = query!Specs();
auto result = conn.query!Specs();
scope(exit) result.close();
enforce(!result.empty(), "Result doesn't contain any rows.");
auto row = result.front();
Expand Down Expand Up @@ -215,7 +227,7 @@ class PGCommand
*/
T executeScalar(T = Variant)(bool throwIfMoreRows = true)
{
auto result = query!T();
auto result = conn.query!T();
scope(exit) result.close();
enforce(!result.empty(), "Result doesn't contain any rows.");
T row = result.front();
Expand All @@ -229,5 +241,3 @@ class PGCommand

alias scalar = executeScalar;
}


45 changes: 31 additions & 14 deletions source/ddb/pg/connection.d
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import std.conv : text, to;
import std.exception : enforce;
import std.datetime : Clock, Date, DateTime, UTC;
import std.string : indexOf, lastIndexOf;
import std.range: isRandomAccessRange, ElementType;
import std.traits: Unqual;

import ddb.db : DBRow;
import ddb.pg.exceptions;
Expand Down Expand Up @@ -67,11 +69,7 @@ class PGConnection
// (for the current connection)
string reservePrepared()
{
synchronized (this)
{

return to!string(lastPrepared++);
}
return to!string(lastPrepared++);
}

package(ddb.pg) Message getMessage()
Expand Down Expand Up @@ -140,7 +138,8 @@ class PGConnection
stream.writeCString(password);
}

void sendParseMessage(string statementName, string query, int[] oids)
void sendParseMessage(R)(string statementName, string query, scope R oids)
if (isRandomAccessRange!(Unqual!R) && is(Unqual!(ElementType!R) == PGType))
{
int len = cast(int)(4 + statementName.length + 1 + query.length + 1 + 2 + oids.length * 4);

Expand Down Expand Up @@ -173,25 +172,33 @@ class PGConnection
bool hasText = false;
int paramsLen = params.calcLen(hasText);

int len = cast(int)( 4 + portalName.length + 1 + statementName.length + 1 + (hasText ? (params.length*2) : 2) + 2 + 2 +
params.length * 4 + paramsLen + 2 + 2 );
int len = cast(int)(
4 + // length of the message
portalName.length + 1 + // length of destination portal name + null terminator
statementName.length + 1 + // length of prepared statement name + null terminator
// next we write parameter formats. If all data is binary,
// we simply put int16(1) + int16(1) bytes, if there is
// text mixed in, we write int16(0) + int16(format) for each parameter
2 + (hasText ? (params.length * 2) : 2) +
2 + // 2 bytes for int16(parameter length)
paramsLen + // length of all parameter tuples (length, body)
2 + 2 // int16 number result columns and int16 result-column format code (only one)
);

stream.write(PGRequestMessageTypes.Bind);
stream.write(len);
stream.writeCString(portalName);
stream.writeCString(statementName);
if(hasText)
if (hasText)
{
stream.write(cast(short) params.length);
stream.write(cast(short)params.length);
foreach(param; params)
{
with (PGType)
switch (param.type)
{
case BOOLEAN:
case TIMESTAMP:
case INET:
case NUMERIC:
case JSONB:
case INTERVAL:
case VARCHAR:
Expand All @@ -201,8 +208,10 @@ class PGConnection
default:
stream.write(cast(short) 1); // binary format
}
}
}
} else {
else
{
stream.write(cast(short)1); // one parameter format code
stream.write(cast(short)1); // binary format
}
Expand Down Expand Up @@ -519,7 +528,7 @@ class PGConnection
assert(0);
}

void handleAsync(scope ref Message msg)
void handleAsync(ref Message msg)
{
with (PGResponseMessageTypes)
switch (msg.type)
Expand Down Expand Up @@ -898,6 +907,14 @@ class PGConnection
execute("BEGIN TRANSACTION ISOLATION LEVEL " ~ mode.level ~ " " ~ mode.rwMode ~ ";");
}

/**
Starts transaction of default type. Small benefit in bandwidth.
*/
void begin_default()
{
execute("BEGIN;");
}

/**
Commits a transaction.
Throws: CommitTransactionException if the commit failed.
Expand Down
61 changes: 27 additions & 34 deletions source/ddb/pg/parameters.d
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
module ddb.pg.parameters;

import std.algorithm.sorting : sort;
import std.algorithm.iteration: map;
import std.conv : to;
import std.variant : Variant;

import ddb.pg.stream : PGStream;
Expand All @@ -25,15 +27,15 @@ class PGParameter
return _value;
}
/// ditto
@property Variant value(T)(T v)
@property Variant value(T)(T v) @trusted
{
params.changed = true;
return _value = Variant(v);
}

package(ddb) this(PGParameters params, short index, PGType type)
{
enforce(index > 0, new ParamException("Parameter's index must be > 0"));
enforce!ParamException(index > 0, "Parameter's index must be > 0");
this.params = params;
this.index = index;
this.type = type;
Expand All @@ -43,23 +45,13 @@ class PGParameter
/// Collection of query parameters
class PGParameters
{
private PGParameter[short] params;
private PGParameter[] params;
private PGCommand cmd;
package(ddb) bool changed;

package(ddb) int[] getOids()
package(ddb) auto getOids()
{
short[] keys = () @trusted { return params.keys; }();
sort(keys);

int[] oids = new int[params.length];

foreach (int i, key; keys)
{
oids[i] = params[key].type;
}

return oids;
return map!(a => a.type)(params);
}

///
Expand All @@ -85,42 +77,41 @@ class PGParameters
assert(cmd.executeNonQuery == 1);
---
*/
PGParameter add(short index, PGType type)
PGParameter add(int index, PGType type)
{
enforce(!cmd.prepared, "Can't add parameter to prepared statement.");
changed = true;
return params[index] = new PGParameter(this, index, type);
enforce!ParamException(index == params.length + 1, "Add parameters sequentually, 1, then 2, then 3...");
params ~= new PGParameter(this, to!short(index), type);
return params[index - 1];
}

PGParameters bind(T)(short index, PGType type, T value)
PGParameters bind(T)(int index, PGType type, T value)
{
enforce(!cmd.prepared, "Can't add parameter to prepared statement.");
changed = true;
params[index] = new PGParameter(this, index, type);
params[index].value = value;
enforce!ParamException(index == params.length + 1, "Add parameters sequentually, 1, then 2, then 3...");
params ~= new PGParameter(this, to!short(index), type);
params[index - 1].value = value;
return this;
}


// todo: remove()

PGParameter opIndex(short index)
PGParameter opIndex(int index)
{
return params[index];
return params[index - 1];
}

int opApply(int delegate(ref PGParameter param) @safe dg)
{
int result = 0;

foreach (number; sort(() @trusted { return params.keys; }()))
foreach (p; params)
{
result = dg(params[number]);

result = dg(p);
if (result)
break;
}

return result;
}

Expand All @@ -139,17 +130,19 @@ class PGParameters
{
if (param.value != null)
{
enforce(param.value.convertsTo!T, new ParamException("Parameter's value is not convertible to " ~ T.stringof));
enforce(param.value.convertsTo!T, new ParamException(
"Parameter's value of type " ~ param.value.type.toString ~
" is not convertible to " ~ T.stringof));
paramsLen += len;
}
}

paramsLen += 4; // all param values are preceded by 4-byte size header

with (PGType)
/*final*/ switch (param.type)
{
case BOOLEAN:
checkParam!bool(1);
break;
case BOOLEAN: checkParam!bool(1); break;
case INT2: checkParam!short(2); break;
case INT4: checkParam!int(4); break;
case INT8: checkParam!long(8); break;
Expand Down Expand Up @@ -193,15 +186,15 @@ class PGParameters
{
if (param.value == null)
{
write(-1);
write(-1); // length to -1 as scpecial case, no value is written
continue;
}

with (PGType)
switch (param.type)
{
case BOOLEAN:
write(cast(bool) 1);
write(cast(int)1);
write(param.value.get!bool);
break;
case INT2:
Expand Down
10 changes: 5 additions & 5 deletions source/ddb/pg/parsers.d
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Parses incoming responses from the Postgres Server

@safe:

PGFields parseRowDescription(scope ref Message msg)
PGFields parseRowDescription(ref Message msg)
{
PGField[] fields;
short fieldCount;
Expand Down Expand Up @@ -45,8 +45,8 @@ PGFields parseRowDescription(scope ref Message msg)
return () @trusted { return cast(PGFields)fields; }();
}

auto parseDataRow(Result)(scope ref Message msg, Result result,
scope ref PGFields fields, PGConnection conn)
auto parseDataRow(Result)(ref Message msg, Result result,
ref PGFields fields, PGConnection conn)
{
alias Row = Result.Row;
result.row = conn.fetchRow!(Result._Specs)(msg, fields);
Expand All @@ -59,7 +59,7 @@ auto parseDataRow(Result)(scope ref Message msg, Result result,
return result;
}

void parseReadyForQuery(scope ref Message msg, PGConnection conn)
void parseReadyForQuery(ref Message msg, PGConnection conn)
@trusted
{
enforce(msg.data.length == 1);
Expand All @@ -74,7 +74,7 @@ void parseReadyForQuery(scope ref Message msg, PGConnection conn)
}
}

void parseCommandCompletion(scope ref Message msg, PGConnection conn, out uint oid, ref ulong rowsAffected)
void parseCommandCompletion(ref Message msg, PGConnection conn, out uint oid, ref ulong rowsAffected)
{
import std.string : indexOf, lastIndexOf;

Expand Down
5 changes: 5 additions & 0 deletions source/ddb/pg/stream.d
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ class PGStream
}
}

void write(bool x)
{
write(nativeToBigEndian(x)); // ubyte[]
}

void write(ubyte x)
{
write(nativeToBigEndian(x)); // ubyte[]
Expand Down