1 module postgresql.link;
2 
3 import postgresql.core;
4 import postgresql.value;
5 import postgresql.protocol;
6 import postgresql.utils;
7 
8 class Link: ILink
9 {
10     import std.conv: to;
11     import std..string: toStringz, fromStringz;
12     import std.uuid: randomUUID;
13     import std.algorithm: countUntil;
14     import std.typecons: RefCounted, RefCountedAutoInitialize;
15     import core.time: Duration, seconds;
16 
17     import postgresql.protocol.stream;
18 
19     private
20     {
21         static struct Data
22         {
23             @disable this(this);
24             URI            uri;
25             PGStream       stream;
26             string         error;
27             string[char]   errorData;
28             string[string] params;
29             uint           pid;
30             uint           key;
31             string         statement;
32             string         statementId;
33 
34             Field[]        fields;
35             size_t         row;
36 
37             ~this()
38             {
39                 // TODO: Cleanup
40                 detach();
41             }
42 
43             void detach()
44             nothrow @safe {
45                 stream.close();
46             }
47 
48             //void statementIdClear()
49             //{
50             //    if (!statementId.length)
51             //    {
52             //        return;
53             //    }
54 
55             //    exec("DEALLOCATE " ~ statementId);
56 
57             //    statementId.length = 0;
58             //}
59 
60             //void statementIdGen()
61             //{
62             //    _.statementId = randomUUID().to!string;
63             //}
64         }
65 
66         RefCounted!Data _;
67     }
68 
69     @property URI uri()
70     {
71         return _.uri;
72     }
73 
74     @property string statement()
75     {
76         return _.statement;
77     }
78 
79     @property string error()
80     {
81         return _.error;
82     }
83 
84     @property bool isAttached()
85     {
86         return _.stream.isConnected;
87     }
88 
89     this(in string uri)
90     {
91         string tmp;
92         parseURI(uri, _.uri, tmp);
93     }
94 
95     ~this()
96     {
97         detach();
98     }
99 
100     bool attach(bool wait = true)
101     {
102         import core.time: seconds;
103         // TODO: Already connected
104 
105         import std.socket : InternetAddress;
106         import postgresql.result.array: ResultArray;
107 
108         bool result;
109 
110         scope(exit)
111         {
112             _debug("%s -> %s", __FUNCTION__, result);
113         }
114 
115         //if (isOpen)
116         //{
117 //                //    _debug("Database.open (Already open, closing)");
118 //                //    close();
119 //                //}
120         _.stream = pgConnect(uri.host, uri.port);
121 
122         if (!_.stream.isConnected)
123         {
124             detach();
125             return result;
126         }
127 
128         Msg msg;
129 
130         if (!_.stream.sendStartupMsg([
131                 "user": uri.username,
132                 "database": uri.database,
133             ]))
134         {
135             detach();
136             return result;
137         }
138 
139         result = _fetchReply!ResultArray(null);
140 
141         if (!result)
142         {
143             detach();
144         }
145         return result;
146     }
147 
148     void detach()
149     {
150         _.detach();
151     }
152 
153     /**
154     *  Call stored procedure
155     */
156     bool perform(A ...)(in string name, A args)
157     {
158         return perform(name, valueArray(args));
159     }
160     ///
161     bool perform(in string name, Value[] args = null)
162     {
163         import std.array;
164         import std.format;
165 
166         auto tmp = appender!string;
167 
168         formattedWrite(tmp, "SELECT * FROM %s(", name);
169 
170         foreach (i; 1 .. args.length + 1)
171         {
172             if (i != 1)
173             {
174                 tmp.put(',');
175             }
176             formattedWrite(tmp, "$%s", i);
177         }
178 
179         tmp.put(")");
180         return exec(tmp.data, args);
181     }
182     /**
183     *  Call stored procedure
184     */
185     bool call(T, A ...)(ref T result, in string name, A args)
186     if (isResult!T)
187     {
188         return call(result, name, valueArray(args));
189     }
190     ///
191     bool call(T)(ref T result, in string name, Value[] args = null)
192     if (isResult!T)
193     {
194         import std.array;
195         import std.format;
196 
197         auto tmp = appender!string;
198 
199         formattedWrite(tmp, "SELECT * FROM %s(", name);
200 
201         foreach (i; 1 .. args.length + 1)
202         {
203             if (i != 1)
204             {
205                 tmp.put(',');
206             }
207             formattedWrite(tmp, "$%s", i);
208         }
209 
210         tmp.put(")");
211         return query(result, tmp.data, args);
212     }
213 
214     bool exec(in string stmt, Value[] args = null)
215     {
216         import postgresql.result.array: ResultArray;
217         return _exec!ResultArray(stmt, args);
218     }
219 
220     bool query(T, A ...)(ref T result, in string stmt, A args)
221     if (isResult!T)
222     {
223         return query(result, stmt, valueArray(args));
224     }
225 
226     bool query(T)(ref T result, in string stmt, Value[] args = null)
227     if (isResult!T)
228     {
229         return _exec(stmt, args, &result);
230     }
231 
232     private void clear()
233     {
234         _.fields.length = 0;
235         _.row = 0;
236         //queryIdClear();
237     }
238 
239     private void _pack(in Value v, ref ubyte[] dst)
240     {
241         if (v.type == Value.Type.bool_)
242         {
243             dst = [v.to!bool ? 't' : 'f'];
244         }
245         else if (v.type != Value.Type.null_)
246         {
247             dst = cast(ubyte[]) v.to!string;
248         }
249         else
250         {
251             dst.length = 0;
252         }
253     }
254 
255     private bool _exec(T)(in string stmt, Value[] args, T *result = null)
256     {
257         clear();
258 
259         _.statement = stmt.dup;
260 
261         bool       _ok;
262         string[]   _args;
263 
264         if (result !is null)
265         {
266             result.reset();
267         }
268 
269         scope(exit)
270         {
271             _debug("%s: %s -> %s", __FUNCTION__, stmt, _ok);
272         }
273 
274         foreach(c; match(_.statement, regexParam))
275         {
276             if (0 > _args.countUntil(c[0]))
277             {
278                 _args ~= c[0].dup;
279             }
280         }
281 
282         if (_args.length)
283         {
284             if (_args.length != args.length)
285             {
286                 //error(DbError.Type.query, "Invalid parameters count");
287                 return false;
288             }
289 
290             PGType[] _types;
291             Param[]  _params;
292 
293             foreach (i, ref k; _args)
294             {
295                 auto _arg = args[i];
296 
297                 Param _param;
298 
299                 if (_arg.type == Value.Type.null_) {
300                     _param.size = -1;
301                 } else {
302                     _pack(_arg, _param.data);
303                     _param.size = cast(int) _param.data.length;
304                 }
305 
306                 _params ~= _param;
307             }
308 
309             if (!(
310                  _.stream.sendParseMsg("", _.statement, _types)
311               && _.stream.sendFlushMsg
312               && _fetchReply(result, ReplyMsgType.parseComplete)
313               && _.stream.sendCloseMsg(NameOf.portal, "")
314               && _.stream.sendBindMsg("", "", _params)
315               && _.stream.sendDescribeMsg(NameOf.portal, "")
316               && _.stream.sendFlushMsg
317               && _fetchReply(result, ReplyMsgType.rowDescription)
318               && _.stream.sendExecuteMsg("", 0)
319               && _.stream.sendSyncMsg
320               && _.stream.sendFlushMsg
321             ))
322             {
323                 detach();
324                 return false;
325             }
326 
327             _ok = _fetchReply(result, ReplyMsgType.readyForQuery);
328         }
329         else
330         {
331             if (!_.stream.sendQueryMsg(_.statement))
332             {
333                 detach();
334 
335                 _debug("%s: %s -> %s", __FUNCTION__, stmt, _.errorData);
336 
337                 return false;
338             }
339 
340             _ok = _fetchReply(result);
341         }
342 
343         return _ok;
344     }
345 
346     private bool _fetchReply(T)(T *result = null, ReplyMsgType stopType = ReplyMsgType.readyForQuery)
347     {
348         import std.bitmanip: bigEndianToNative;
349 
350         do
351         {
352             auto msg = _.stream.readNext(true);
353 
354             _debug("MSG: %s", msg);
355 
356             switch(msg.type)
357             {
358                 case ReplyMsgType.errorResponse:
359                     _.errorData = unpackError(msg.data);
360                     _.error = _.errorData.get('M', "");
361                     _debug("Error: %s", _.errorData);
362                     return false;
363                 case ReplyMsgType.noticeResponse: // TODO:
364                     _debug("Notice: %s", msg);
365                     break;
366                 case ReplyMsgType.authentication:
367                     bool authOk;
368                     switch(unpackAuth(msg.data))
369                     {
370                         default:
371                             break;
372                         case AuthReply.ok:
373                             authOk = true;
374                             break;
375                         case AuthReply.md5Password:
376                             auto tkn = "md5" ~ md5HexString(md5HexString(uri.password, uri.username), msg.data);
377                             authOk   = _.stream.sendPasswordMsg(cast(ubyte[]) tkn);
378                             if (authOk)
379                             {
380                                 _.stream.waitForData;
381                             }
382                             break;
383                     }
384                     if (!authOk)
385                     {
386                         return false;
387                     }
388                     break;
389                 case ReplyMsgType.parameterStatus:
390                     foreach(ref k, ref v; unpackMap(msg.data))
391                     {
392                         _.params[k] = v;
393                     }
394                     break;
395                 case ReplyMsgType.backendKeyData:
396                     unpackBackendKey(msg.data, _.pid, _.key);
397                     break;
398                 case ReplyMsgType.rowDescription:
399                     if (result !is null)
400                     {
401                         unpackRowDescription(msg.data, _.fields);
402 
403                         result.cols = cast(uint) _.fields.length;
404 
405                         _debug("FIELDS: %s", _.fields);
406 
407                         result.fields = _.fields;
408                     }
409                     break;
410                 case ReplyMsgType.dataRow:
411                     if (result !is null)
412                     {
413                         scope(exit)
414                         {
415                             result.rowEnd(_.row);
416                             _.row++;
417                         }
418 
419                         result.rowBegin(_.row);
420 
421                         auto data = msg.data[];
422 
423                         auto fieldsCount = data[0 .. 2].bigEndianToNative!ushort;
424 
425                         data = data[2 .. $];
426 
427                         foreach(col; 0 .. fieldsCount)
428                         {
429                             auto len = data[0 .. 4].bigEndianToNative!int;
430                             data = data[4 .. $];
431 
432                             Value v;
433                             v.tag = _.fields[col].typeOid;
434 
435                             if (-1 == len)
436                             {
437                                 result.dataAdd(_.row, col, v);
438                                 continue;
439                             }
440 
441                             auto val = cast(string) data[0 .. len].dup;
442 
443                             scope(exit)
444                             {
445                                 data = data[len .. $];
446                             }
447 
448                             import std..string: split;
449 
450                             switch (v.tag) with (PGType)
451                             {
452                                 case CHAROID, VARCHAROID, TEXTOID:
453                                 default:
454                                     v = Value(val);
455                                     break;
456                                 case BOOLOID:
457                                     v = Value(val == "t");
458                                     break;
459                                 case INT2OID:
460                                 case INT4OID:
461                                     v = Value(val.to!int);
462                                     break;
463                                 case INT8OID:
464                                     v = Value(val.to!long);
465                                     break;
466                                 case FLOAT4OID:
467                                 case FLOAT8OID:
468                                     v = Value(val.to!double);
469                                     break;
470                                 case NUMERICOID:
471                                     v = Value(val.to!real);
472                                     break;
473                                 case DATEOID:
474                                     // FIXME:
475                                     //v =  Value(Date.fromISOExtString(val));
476                                     v = Value(val);
477                                     break;
478                                 case TIMESTAMPOID:
479                                     // FIXME:
480                                     //v =  Value(SysTime.fromISOExtString(str.replace(" ", "T")));
481                                     v = Value(val);
482                                     break;
483                                 case TIMESTAMPTZOID:
484                                     // FIXME:
485                                     //v = Value(SysTime.fromISOExtString(str.replace(" ", "T")));
486                                     v = Value(val);
487                                     break;
488                                 case TEXTARRAYOID:
489                                     v = Value(val[1 .. $ - 1].split(","));
490                                     break;
491                                 case JSONOID:
492                                 case JSONBOID:
493                                     v = fromJsonString(val);
494                                     break;
495                             }
496                             result.dataAdd(_.row, col, v);
497                         }
498                     }
499                     break;
500                 case ReplyMsgType.parseComplete:
501                 case ReplyMsgType.bindComplete:
502                 case ReplyMsgType.closeComplete:
503                 case ReplyMsgType.commandComplete:
504                     // TODO:
505                     break;
506                 case ReplyMsgType.readyForQuery:
507                     break;
508                 default:
509                     _debug("Unhandled message: %s", msg);
510             }
511 
512             if (msg.type == stopType)
513             {
514                 return true;
515             }
516 
517         } while (_.stream.dataAvailableForRead);
518 
519         return false;
520     }
521 }