--- sql/pgsql/parser3pgsql.C 2002/12/09 12:36:19 1.9 +++ sql/pgsql/parser3pgsql.C 2008/06/26 15:49:40 1.30 @@ -1,13 +1,13 @@ /** @file Parser PgSQL driver. - Copyright(c) 2001, 2002 ArtLebedev Group (http://www.artlebedev.com) + Copyright(c) 2001, 2003 ArtLebedev Group (http://www.artlebedev.com) Author: Alexandr Petrosian (http://paf.design.ru) - 2001.07.30 using PgSQL 7.1.2 + 2007.10.25 using PgSQL 8.1.5 */ -static const char *RCSId="$Id: parser3pgsql.C,v 1.9 2002/12/09 12:36:19 paf Exp $"; +static const char *RCSId="$Id: parser3pgsql.C,v 1.30 2008/06/26 15:49:40 misha Exp $"; #include "config_includes.h" @@ -22,7 +22,7 @@ static const char *RCSId="$Id: parser3pg // actually writing chunks of that size failed, reduced it twice #define LO_BUFSIZE 0x1000 // from postgres_ext.h -#define InvalidOid ((Oid) 0) +//#define InvalidOid ((Oid) 0) #include "ltdl.h" @@ -36,28 +36,50 @@ static const char *RCSId="$Id: parser3pg #endif #ifndef max -inline int max(int a,int b) { return a>b?a:b; } +inline int max(int a,int b){ return a>b?a:b; } inline int min(int a,int b){ return a_throw(PQerrorMessage(connection.conn)) #define PQclear_throw(msg) { \ PQclear(res); \ - services._throw(msg); \ + connection.services->_throw(msg); \ } - #define PQclear_throwPQerror PQclear_throw(PQerrorMessage(conn)) + #define PQclear_throwPQerror PQclear_throw(PQerrorMessage(connection.conn)) /** connect - @param used_only_in_connect_url - format: @b user:pass@host[:port]|[local]/database + @param url + format: @b user:pass@host[:port]|[local]/database? + ClientCharset=charset& // transcode by parser + charset=value& // transcode by server with 'SET CLIENT_ENCODING=value' + datestyle=value& // 'SET DATESTYLE=value' available values are: ISO|SQL|Postgres|European|US|German [default=ISO] + autocommit=1& + WithoutDefaultTransaction=1 // == autocommit=0 */ void connect( - char *used_only_in_connect_url, - SQL_Driver_services& services, - void **connection ///< output: PGconn * - ) { - char *user=used_only_in_connect_url; - char *host=lsplit(user, '@'); - char *db=lsplit(host, '/'); - char *pwd=lsplit(user, ':'); - char *port=lsplit(host, ':'); + char* url, + SQL_Driver_services& services, + void** connection_ref ///< output: Connection* + ){ + char* user=url; + char* host=rsplit(user, '@'); + char* db=lsplit(host, '/'); + char* pwd=lsplit(user, ':'); + char* port=lsplit(host, ':'); char *options=lsplit(db, '?'); - PGconn *conn=PQsetdbLogin( + char* charset=0; + char* datestyle=0; + + Connection& connection=*(Connection *)services.malloc(sizeof(Connection)); + *connection_ref=&connection; + connection.services=&services; + connection.client_charset=0; + connection.autocommit=true; + connection.conn=PQsetdbLogin( (host&&strcasecmp(host, "local")==0)?NULL/* local Unix domain socket */:host, port, NULL, NULL, db, user, pwd); - if(!conn) + + if(!connection.conn) services._throw("PQsetdbLogin failed"); - if(PQstatus(conn)!=CONNECTION_OK) - throwPQerror; - char *charset=0; - char *datestyle=0; + if(PQstatus(connection.conn)!=CONNECTION_OK) + throwPQerror; - while(options) { - if(char *key=lsplit(&options, '&')) { - if(*key) { - if(char *value=lsplit(key, '=')) { - if(strcasecmp(key, "charset")==0) { + while(options){ + if(char *key=lsplit(&options, '&')){ + if(*key){ + if(char *value=lsplit(key, '=')){ + if(strcmp(key, "ClientCharset")==0){ // transcoding with parser + toupper_str(value, value, strlen(value)); + connection.client_charset=value; + } else if(strcasecmp(key, "charset")==0){ // transcoding with server charset=value; - } else if(strcasecmp(key, "datestyle")==0) { + } else if(strcasecmp(key, "datestyle")==0){ datestyle=value; + } else if(strcasecmp(key, "autocommit")==0){ + if(atoi(value)==0) + connection.autocommit=false; + } else if(strcmp(key, "WithoutDefaultTransaction")==0){ // backward, use autocommit=0 + if(atoi(value)==1) + connection.autocommit=false; } else services._throw("unknown connect option" /*key*/); } else @@ -126,103 +170,112 @@ public: } } - if(charset) { - // set CLIENT_ENCODING - char statement[MAX_STRING]="set CLIENT_ENCODING="; // win + if(charset){ + char statement[MAX_STRING]="SET CLIENT_ENCODING="; strncat(statement, charset, MAX_STRING); - - PGresult *res=PQexec(conn, statement); - if(!res) - throwPQerror; - PQclear(res); // throw out the result [don't need but must call] + + _execute_cmd(connection, statement); } - if(datestyle) { - // set DATESTYLE - char statement[MAX_STRING]="set DATESTYLE="; // ISO,SQL,Postgres,European,NonEuropean=US,German,DEFAULT=ISO - strncat(statement, charset, MAX_STRING); - - PGresult *res=PQexec(conn, statement); - if(!res) - throwPQerror; - PQclear(res); // throw out the result [don't need but must call] + if(datestyle){ + char statement[MAX_STRING]="SET DATESTYLE="; + strncat(statement, datestyle, MAX_STRING); + + _execute_cmd(connection, statement); } - *(PGconn **)connection=conn; - begin_transaction(services, conn); + _begin_transaction(connection); } - void disconnect(void *connection) { - PQfinish((PGconn *)connection); + + void disconnect(void *aconnection){ + Connection& connection=*static_cast(aconnection); + PQfinish(connection.conn); + connection.conn=0; } - void commit(SQL_Driver_services& services, void *connection) { - PGconn *conn=(PGconn *)connection; - if(PGresult *res=PQexec(conn, "COMMIT")) - PQclear(res); - else - throwPQerror; - begin_transaction(services, conn); + + void commit(void *aconnection){ + Connection& connection=*static_cast(aconnection); + if(connection.autocommit){ + _execute_cmd(connection, "COMMIT"); + } + _begin_transaction(connection); } - void rollback(SQL_Driver_services& services, void *connection) { - PGconn *conn=(PGconn *)connection; - if(PGresult *res=PQexec(conn, "ROLLBACK")) - PQclear(res); - else - throwPQerror; - begin_transaction(services, conn); + + void rollback(void *aconnection){ + Connection& connection=*static_cast(aconnection); + if(connection.autocommit){ + _execute_cmd(connection, "ROLLBACK"); + } + _begin_transaction(connection); } - bool ping(SQL_Driver_services&, void *connection) { - return PQstatus((PGconn *)connection)==CONNECTION_OK; + bool ping(void *aconnection) { + Connection& connection=*static_cast(aconnection); + return PQstatus(connection.conn)==CONNECTION_OK; } - unsigned int quote( - SQL_Driver_services&, void *connection, - char *to, const char *from, unsigned int length) { - if(to) { // store mode - unsigned int result=length; - while(length--) { - switch(*from) { - case '\'': // "'" -> "''" - *to++='\''; result++; - break; - case '\\': // "\" -> "\\" - *to++='\''; result++; - break; - } - *to++=*from++; - } - return result; - } else // estimate mode - return length*2; - } - void query( - SQL_Driver_services& services, void *connection, - const char *astatement, unsigned long offset, unsigned long limit, - SQL_Driver_query_event_handlers& handlers) { -// _asm int 3; + const char* quote(void *aconnection, const char *from, unsigned int length){ + Connection& connection=*static_cast(aconnection); + + char *result=(char*)connection.services->malloc_atomic(length*2+1); + int err=0; + PQescapeStringConn(connection.conn, result, from, length, &err); + return result; + } + + void query(void *aconnection, + const char *astatement, + size_t placeholders_count, Placeholder* placeholders, + unsigned long offset, unsigned long limit, + SQL_Driver_query_event_handlers& handlers + ){ + Connection& connection=*static_cast(aconnection); + const char* client_charset=connection.client_charset; + SQL_Driver_services& services=*connection.services; + PGconn *conn=connection.conn; + + bool transcode_needed=_transcode_required(connection); + + const char** paramValues; + if(placeholders_count>0){ + int binds_size=sizeof(char)*placeholders_count; + paramValues = static_cast(services.malloc_atomic(binds_size)); + _bind_parameters(placeholders_count, placeholders, paramValues, connection, transcode_needed); + } - PGconn *conn=(PGconn *)connection; + // transcode query from $request:charset to ?ClientCharset + if(transcode_needed){ + size_t length=strlen(astatement); + services.transcode(astatement, length, + astatement, length, + services.request_charset(), + connection.client_charset); + } - const char *statement=preprocess_statement(services, conn, - astatement, offset, limit); + const char *statement=_preprocess_statement(connection, astatement, offset, limit); + // error after prepare? - PGresult *res=PQexec(conn, statement); + PGresult *res; + if(placeholders_count>0){ + res=PQexecParams(conn, statement, placeholders_count, NULL, paramValues, NULL, NULL, 0); + } else { + res=PQexec(conn, statement); + } if(!res) throwPQerror; switch(PQresultStatus(res)) { - case PGRES_EMPTY_QUERY: - PQclear_throw("no query"); - break; - case PGRES_COMMAND_OK: - // empty result: insert|delete|update|... - PQclear(res); - return; - case PGRES_TUPLES_OK: - break; - default: - PQclear_throwPQerror; - break; + case PGRES_EMPTY_QUERY: + PQclear_throw("no query"); + break; + case PGRES_COMMAND_OK: // empty result: insert|delete|update|... + PQclear(res); + return; + case PGRES_TUPLES_OK: + break; + default: + PQclear_throwPQerror; + break; } int column_count=PQnfields(res); @@ -239,10 +292,19 @@ public: for(int i=0; itranscode(ph.name, strlen(ph.name), + ph.name, name_length, + connection.services->request_charset(), + connection.client_charset); + + if(ph.value) { + connection.services->transcode(ph.value, strlen(ph.value), + ph.value, value_length, + connection.services->request_charset(), + connection.client_charset); + } + } + int name_numner=atoi(ph.name); + if(name_numner <= 0 || name_numner > placeholders_count) + connection.services->_throw("bad bind parameter key"); - void begin_transaction(SQL_Driver_services& services, PGconn *conn) { - if(PGresult *res=PQexec(conn, "BEGIN")) - PQclear(res); + paramValues[name_numner-1]=ph.value; + } + } + + + /** + Executes a query and throw away the result. + */ + void _execute_cmd(const Connection& connection, const char *query){ + if(PGresult *res=PQexec(connection.conn, query)) + PQclear(res); // throw out the result [don't need but must call] else throwPQerror; } - const char *preprocess_statement(SQL_Driver_services& services, PGconn *conn, - const char *astatement, unsigned long offset, unsigned long limit) { + void _begin_transaction(Connection& connection){ + if(connection.autocommit) + _execute_cmd(connection, "BEGIN"); + } + + const char *_preprocess_statement( + Connection& connection, + const char *astatement, + unsigned long offset, + unsigned long limit + ){ + PGconn *conn=connection.conn; + size_t statement_size=strlen(astatement); - char *result=(char *)services.malloc(statement_size + char *result=(char *)connection.services->malloc(statement_size +MAX_NUMBER*2+15 // limit # offset # +MAX_STRING // in case of short 'strings' +1); // offset & limit -> suffixes const char *o; - if(offset || limit) { + if(offset || limit!=SQL_NO_LIMIT){ char *cur=result; memcpy(cur, astatement, statement_size); cur+=statement_size; - if(limit) + if(limit!=SQL_NO_LIMIT) cur+=snprintf(cur, 7+MAX_NUMBER, " limit %u", limit); if(offset) cur+=snprintf(cur, 8+MAX_NUMBER, " offset %u", offset); @@ -338,6 +458,7 @@ private: // private funcs o[0]=='/' && o[1]=='*' && o[2]=='*') { // name start + const char* saved_o=o; o+=3; while(*o) if( @@ -345,6 +466,7 @@ private: // private funcs o[1]=='*' && o[2]=='/' && o[3]=='\'') { // name end + saved_o=0; // found, marking that o+=4; Oid oid=lo_creat(conn, INV_READ|INV_WRITE); if(oid==InvalidOid) @@ -358,13 +480,13 @@ private: // private funcs if(escaped) { // write pending, skip "\" or "'" if(!lo_write_ex(conn, fd, start, o-start)) - services._throw("lo_write could not write all bytes of object (1)"); + connection.services->_throw("lo_write could not write all bytes of object (1)"); start=++o; } else o++; } if(!lo_write_ex(conn, fd, start, o-start)) - services._throw("lo_write can not write all bytes of object (2)"); + connection.services->_throw("lo_write can not write all bytes of object (2)"); if(lo_close(conn, fd)<0) throwPQerror; } else @@ -376,6 +498,10 @@ private: // private funcs break; } else o++; // /**skip**/'xxx' + if(saved_o) { + o=saved_o; + *n++=*o++; + } } else *n++=*o++; } @@ -384,22 +510,25 @@ private: // private funcs return result; } + bool _transcode_required(Connection& connection){ + return (connection.client_charset && strcmp(connection.client_charset, connection.services->request_charset())!=0); + } + private: // lo_read/write exchancements bool lo_read_ex(PGconn *conn, int fd, const/*paf*/ char *buf, size_t len) { - int size_read; - while(len && (size_read=lo_read(conn, fd, buf, min(LO_BUFSIZE, len)))>0) { - buf+=size_read; - len-=size_read; - } - return len==0; + return lo_rw_method (conn, fd, buf, len, lo_read); } bool lo_write_ex(PGconn *conn, int fd, const/*paf*/ char *buf, size_t len) { - int size_written; - while(len && (size_written=lo_write(conn, fd, buf, min(LO_BUFSIZE, len)))>0) { - buf+=size_written; - len-=size_written; + return lo_rw_method (conn, fd, buf, len, lo_write); + } + + bool lo_rw_method(PGconn *conn, int fd, const/*paf*/ char *buf, size_t len, int (*lo_func)(PGconn *conn, int fd, const/*paf*/ char *buf, size_t len)) { + int size_op; + while(len && (size_op=lo_func(conn, fd, buf, min(LO_BUFSIZE, len)))>0) { + buf+=size_op; + len-=size_op; } return len==0; } @@ -418,15 +547,25 @@ private: // conn client library funcs typedef char *(*t_PQerrorMessage)(const PGconn* conn); t_PQerrorMessage PQerrorMessage; typedef ConnStatusType (*t_PQstatus)(const PGconn *conn); t_PQstatus PQstatus; typedef PGresult *(*t_PQexec)(PGconn *conn, - const char *query); t_PQexec PQexec; + const char *query); t_PQexec PQexec; + typedef PGresult *(*t_PQexecParams)( + PGconn *conn, + const char *query, + int nParams, + const Oid *paramTypes, + const char * const *paramValues, + const int *paramLengths, + const int *paramFormats, + int resultFormat); t_PQexecParams PQexecParams; + typedef ExecStatusType (*t_PQresultStatus)(const PGresult *res); t_PQresultStatus PQresultStatus; typedef int (*t_PQgetlength)(const PGresult *res, - int tup_num, - int field_num); t_PQgetlength PQgetlength; + int tup_num, + int field_num); t_PQgetlength PQgetlength; typedef char* (*t_PQgetvalue)(const PGresult *res, - int tup_num, - int field_num); t_PQgetvalue PQgetvalue; - typedef int (*t_PQntuples)(const PGresult *res); t_PQntuples PQntuples; + int tup_num, + int field_num); t_PQgetvalue PQgetvalue; + typedef int (*t_PQntuples)(const PGresult *res); t_PQntuples PQntuples; typedef char *(*t_PQfname)(const PGresult *res, int field_index); t_PQfname PQfname; typedef int (*t_PQnfields)(const PGresult *res); t_PQnfields PQnfields; @@ -434,6 +573,10 @@ private: // conn client library funcs typedef Oid (*t_PQftype)(const PGresult *res, int field_num); t_PQftype PQftype; + typedef size_t (*t_PQescapeStringConn)(PGconn *conn, + char *to, const char *from, size_t length, + int *error); t_PQescapeStringConn PQescapeStringConn; + typedef int (*t_lo_open)(PGconn *conn, Oid lobjId, int mode); t_lo_open lo_open; typedef int (*t_lo_close)(PGconn *conn, int fd); t_lo_close lo_close; typedef int (*t_lo_read)(PGconn *conn, int fd, const/*paf*/ char *buf, size_t len); t_lo_read lo_read; @@ -448,8 +591,10 @@ private: // conn client library funcs private: // conn client library funcs linking const char *dlink(const char *dlopen_file_spec) { - lt_dlhandle handle=lt_dlopen(dlopen_file_spec); - if(!handle) + if(lt_dlinit()) + return lt_dlerror(); + lt_dlhandle handle=lt_dlopen(dlopen_file_spec); + if(!handle) return "can not open the dynamic link module"; #define DSLINK(name, action) \ @@ -471,7 +616,9 @@ private: // conn client library funcs li DLINK(PQclear); DLINK(PQresultStatus); DLINK(PQexec); + DLINK(PQexecParams); DLINK(PQftype); + DLINK(PQescapeStringConn); DLINK(lo_open); DLINK(lo_close); DLINK(lo_read); DLINK(lo_write); DLINK(lo_lseek); DLINK(lo_creat); @@ -480,7 +627,6 @@ private: // conn client library funcs li return 0; } - }; extern "C" SQL_Driver *SQL_DRIVER_CREATE() {