1 module postgresql.link; 2 3 import postgresql.core; 4 import postgresql.value; 5 import postgresql.protocol; 6 import postgresql.utils; 7 8 class Link: ILink 9 { 10 import std.conv: to; 11 import std..string: toStringz, fromStringz; 12 import std.uuid: randomUUID; 13 import std.algorithm: countUntil; 14 import std.typecons: RefCounted, RefCountedAutoInitialize; 15 import core.time: Duration, seconds; 16 17 import postgresql.protocol.stream; 18 19 private 20 { 21 static struct Data 22 { 23 @disable this(this); 24 URI uri; 25 PGStream stream; 26 string error; 27 string[char] errorData; 28 string[string] params; 29 uint pid; 30 uint key; 31 string statement; 32 string statementId; 33 34 Field[] fields; 35 size_t row; 36 37 ~this() 38 { 39 // TODO: Cleanup 40 detach(); 41 } 42 43 void detach() 44 nothrow @safe { 45 stream.close(); 46 } 47 48 //void statementIdClear() 49 //{ 50 // if (!statementId.length) 51 // { 52 // return; 53 // } 54 55 // exec("DEALLOCATE " ~ statementId); 56 57 // statementId.length = 0; 58 //} 59 60 //void statementIdGen() 61 //{ 62 // _.statementId = randomUUID().to!string; 63 //} 64 } 65 66 RefCounted!Data _; 67 } 68 69 @property URI uri() 70 { 71 return _.uri; 72 } 73 74 @property string statement() 75 { 76 return _.statement; 77 } 78 79 @property string error() 80 { 81 return _.error; 82 } 83 84 @property bool isAttached() 85 { 86 return _.stream.isConnected; 87 } 88 89 this(in string uri) 90 { 91 string tmp; 92 parseURI(uri, _.uri, tmp); 93 } 94 95 ~this() 96 { 97 detach(); 98 } 99 100 bool attach(bool wait = true) 101 { 102 import core.time: seconds; 103 // TODO: Already connected 104 105 import std.socket : InternetAddress; 106 import postgresql.result.array: ResultArray; 107 108 bool result; 109 110 scope(exit) 111 { 112 _debug("%s -> %s", __FUNCTION__, result); 113 } 114 115 //if (isOpen) 116 //{ 117 // // _debug("Database.open (Already open, closing)"); 118 // // close(); 119 // //} 120 _.stream = pgConnect(uri.host, uri.port); 121 122 if (!_.stream.isConnected) 123 { 124 detach(); 125 return result; 126 } 127 128 Msg msg; 129 130 if (!_.stream.sendStartupMsg([ 131 "user": uri.username, 132 "database": uri.database, 133 ])) 134 { 135 detach(); 136 return result; 137 } 138 139 result = _fetchReply!ResultArray(null); 140 141 if (!result) 142 { 143 detach(); 144 } 145 return result; 146 } 147 148 void detach() 149 { 150 _.detach(); 151 } 152 153 /** 154 * Call stored procedure 155 */ 156 bool perform(A ...)(in string name, A args) 157 { 158 return perform(name, valueArray(args)); 159 } 160 /// 161 bool perform(in string name, Value[] args = null) 162 { 163 import std.array; 164 import std.format; 165 166 auto tmp = appender!string; 167 168 formattedWrite(tmp, "SELECT * FROM %s(", name); 169 170 foreach (i; 1 .. args.length + 1) 171 { 172 if (i != 1) 173 { 174 tmp.put(','); 175 } 176 formattedWrite(tmp, "$%s", i); 177 } 178 179 tmp.put(")"); 180 return exec(tmp.data, args); 181 } 182 /** 183 * Call stored procedure 184 */ 185 bool call(T, A ...)(ref T result, in string name, A args) 186 if (isResult!T) 187 { 188 return call(result, name, valueArray(args)); 189 } 190 /// 191 bool call(T)(ref T result, in string name, Value[] args = null) 192 if (isResult!T) 193 { 194 import std.array; 195 import std.format; 196 197 auto tmp = appender!string; 198 199 formattedWrite(tmp, "SELECT * FROM %s(", name); 200 201 foreach (i; 1 .. args.length + 1) 202 { 203 if (i != 1) 204 { 205 tmp.put(','); 206 } 207 formattedWrite(tmp, "$%s", i); 208 } 209 210 tmp.put(")"); 211 return query(result, tmp.data, args); 212 } 213 214 bool exec(in string stmt, Value[] args = null) 215 { 216 import postgresql.result.array: ResultArray; 217 return _exec!ResultArray(stmt, args); 218 } 219 220 bool query(T, A ...)(ref T result, in string stmt, A args) 221 if (isResult!T) 222 { 223 return query(result, stmt, valueArray(args)); 224 } 225 226 bool query(T)(ref T result, in string stmt, Value[] args = null) 227 if (isResult!T) 228 { 229 return _exec(stmt, args, &result); 230 } 231 232 private void clear() 233 { 234 _.fields.length = 0; 235 _.row = 0; 236 //queryIdClear(); 237 } 238 239 private void _pack(in Value v, ref ubyte[] dst) 240 { 241 if (v.type == Value.Type.bool_) 242 { 243 dst = [v.to!bool ? 't' : 'f']; 244 } 245 else if (v.type != Value.Type.null_) 246 { 247 dst = cast(ubyte[]) v.to!string; 248 } 249 else 250 { 251 dst.length = 0; 252 } 253 } 254 255 private bool _exec(T)(in string stmt, Value[] args, T *result = null) 256 { 257 clear(); 258 259 _.statement = stmt.dup; 260 261 bool _ok; 262 string[] _args; 263 264 if (result !is null) 265 { 266 result.reset(); 267 } 268 269 scope(exit) 270 { 271 _debug("%s: %s -> %s", __FUNCTION__, stmt, _ok); 272 } 273 274 foreach(c; match(_.statement, regexParam)) 275 { 276 if (0 > _args.countUntil(c[0])) 277 { 278 _args ~= c[0].dup; 279 } 280 } 281 282 if (_args.length) 283 { 284 if (_args.length != args.length) 285 { 286 //error(DbError.Type.query, "Invalid parameters count"); 287 return false; 288 } 289 290 PGType[] _types; 291 Param[] _params; 292 293 foreach (i, ref k; _args) 294 { 295 auto _arg = args[i]; 296 297 Param _param; 298 299 if (_arg.type == Value.Type.null_) { 300 _param.size = -1; 301 } else { 302 _pack(_arg, _param.data); 303 _param.size = cast(int) _param.data.length; 304 } 305 306 _params ~= _param; 307 } 308 309 if (!( 310 _.stream.sendParseMsg("", _.statement, _types) 311 && _.stream.sendFlushMsg 312 && _fetchReply(result, ReplyMsgType.parseComplete) 313 && _.stream.sendCloseMsg(NameOf.portal, "") 314 && _.stream.sendBindMsg("", "", _params) 315 && _.stream.sendDescribeMsg(NameOf.portal, "") 316 && _.stream.sendFlushMsg 317 && _fetchReply(result, ReplyMsgType.rowDescription) 318 && _.stream.sendExecuteMsg("", 0) 319 && _.stream.sendSyncMsg 320 && _.stream.sendFlushMsg 321 )) 322 { 323 detach(); 324 return false; 325 } 326 327 _ok = _fetchReply(result, ReplyMsgType.readyForQuery); 328 } 329 else 330 { 331 if (!_.stream.sendQueryMsg(_.statement)) 332 { 333 detach(); 334 335 _debug("%s: %s -> %s", __FUNCTION__, stmt, _.errorData); 336 337 return false; 338 } 339 340 _ok = _fetchReply(result); 341 } 342 343 return _ok; 344 } 345 346 private bool _fetchReply(T)(T *result = null, ReplyMsgType stopType = ReplyMsgType.readyForQuery) 347 { 348 import std.bitmanip: bigEndianToNative; 349 350 do 351 { 352 auto msg = _.stream.readNext(true); 353 354 _debug("MSG: %s", msg); 355 356 switch(msg.type) 357 { 358 case ReplyMsgType.errorResponse: 359 _.errorData = unpackError(msg.data); 360 _.error = _.errorData.get('M', ""); 361 _debug("Error: %s", _.errorData); 362 return false; 363 case ReplyMsgType.noticeResponse: // TODO: 364 _debug("Notice: %s", msg); 365 break; 366 case ReplyMsgType.authentication: 367 bool authOk; 368 switch(unpackAuth(msg.data)) 369 { 370 default: 371 break; 372 case AuthReply.ok: 373 authOk = true; 374 break; 375 case AuthReply.md5Password: 376 auto tkn = "md5" ~ md5HexString(md5HexString(uri.password, uri.username), msg.data); 377 authOk = _.stream.sendPasswordMsg(cast(ubyte[]) tkn); 378 if (authOk) 379 { 380 _.stream.waitForData; 381 } 382 break; 383 } 384 if (!authOk) 385 { 386 return false; 387 } 388 break; 389 case ReplyMsgType.parameterStatus: 390 foreach(ref k, ref v; unpackMap(msg.data)) 391 { 392 _.params[k] = v; 393 } 394 break; 395 case ReplyMsgType.backendKeyData: 396 unpackBackendKey(msg.data, _.pid, _.key); 397 break; 398 case ReplyMsgType.rowDescription: 399 if (result !is null) 400 { 401 unpackRowDescription(msg.data, _.fields); 402 403 result.cols = cast(uint) _.fields.length; 404 405 _debug("FIELDS: %s", _.fields); 406 407 result.fields = _.fields; 408 } 409 break; 410 case ReplyMsgType.dataRow: 411 if (result !is null) 412 { 413 scope(exit) 414 { 415 result.rowEnd(_.row); 416 _.row++; 417 } 418 419 result.rowBegin(_.row); 420 421 auto data = msg.data[]; 422 423 auto fieldsCount = data[0 .. 2].bigEndianToNative!ushort; 424 425 data = data[2 .. $]; 426 427 foreach(col; 0 .. fieldsCount) 428 { 429 auto len = data[0 .. 4].bigEndianToNative!int; 430 data = data[4 .. $]; 431 432 Value v; 433 v.tag = _.fields[col].typeOid; 434 435 if (-1 == len) 436 { 437 result.dataAdd(_.row, col, v); 438 continue; 439 } 440 441 auto val = cast(string) data[0 .. len].dup; 442 443 scope(exit) 444 { 445 data = data[len .. $]; 446 } 447 448 import std..string: split; 449 450 switch (v.tag) with (PGType) 451 { 452 case CHAROID, VARCHAROID, TEXTOID: 453 default: 454 v = Value(val); 455 break; 456 case BOOLOID: 457 v = Value(val == "t"); 458 break; 459 case INT2OID: 460 case INT4OID: 461 v = Value(val.to!int); 462 break; 463 case INT8OID: 464 v = Value(val.to!long); 465 break; 466 case FLOAT4OID: 467 case FLOAT8OID: 468 v = Value(val.to!double); 469 break; 470 case NUMERICOID: 471 v = Value(val.to!real); 472 break; 473 case DATEOID: 474 // FIXME: 475 //v = Value(Date.fromISOExtString(val)); 476 v = Value(val); 477 break; 478 case TIMESTAMPOID: 479 // FIXME: 480 //v = Value(SysTime.fromISOExtString(str.replace(" ", "T"))); 481 v = Value(val); 482 break; 483 case TIMESTAMPTZOID: 484 // FIXME: 485 //v = Value(SysTime.fromISOExtString(str.replace(" ", "T"))); 486 v = Value(val); 487 break; 488 case TEXTARRAYOID: 489 v = Value(val[1 .. $ - 1].split(",")); 490 break; 491 case JSONOID: 492 case JSONBOID: 493 v = fromJsonString(val); 494 break; 495 } 496 result.dataAdd(_.row, col, v); 497 } 498 } 499 break; 500 case ReplyMsgType.parseComplete: 501 case ReplyMsgType.bindComplete: 502 case ReplyMsgType.closeComplete: 503 case ReplyMsgType.commandComplete: 504 // TODO: 505 break; 506 case ReplyMsgType.readyForQuery: 507 break; 508 default: 509 _debug("Unhandled message: %s", msg); 510 } 511 512 if (msg.type == stopType) 513 { 514 return true; 515 } 516 517 } while (_.stream.dataAvailableForRead); 518 519 return false; 520 } 521 }