File:  [parser3project] / parser3 / src / sql / pgsql / Attic / parser3pgsql.C
Revision 1.13: download - view: text, annotated - select for diffs - revision graph
Wed Sep 5 08:57:43 2001 UTC (24 years, 9 months ago) by parser
Branches: MAIN
CVS tags: HEAD
SQL_DRIVER_CREATE_FUNC_NAME

/** @file
	Parser PgSQL driver.

	Copyright(c) 2001 ArtLebedev Group(http://www.artlebedev.com)

	Author: Alexander Petrosyan <paf@design.ru>(http://design.ru/paf)

	2001.07.30 using PgSQL 7.1.2
*/
static const char *RCSId="$Id: parser3pgsql.C,v 1.13 2001/09/05 08:57:43 parser Exp $"; 

#include "config_includes.h"

#include "pa_sql_driver.h"

#include <libpq-fe.h>
#include <libpq/libpq-fs.h>

// OIDOID from catalog/pg_type.h
#define OIDOID			26
// LO_BUFSIZE from interfaces\libpq\fe-lobj.c = 8192 (0x2000)
// actually writing chunks of that size failed, reduced it twice
#define LO_BUFSIZE		  0x1000
// from postgres_ext.h
#define InvalidOid		((Oid) 0)


#include "ltdl.h"

#define MAX_STRING 0x400
#define MAX_NUMBER 20

#if _MSC_VER
#	define snprintf _snprintf
#	define strcasecmp _stricmp
#endif

#ifndef max
inline int max(int a,int b) { return a>b?a:b; }
inline int min(int a,int b){ return a<b?a:b; }
#endif

static char *lsplit(char *string, char delim) {
    if(string) {
		char *v=strchr(string, delim);
		if(v) {
			*v=0;
			return v+1;
		}
    }
    return 0;
}

/**
	PgSQL server driver
*/
class PgSQL_Driver : public SQL_Driver {
public:

	PgSQL_Driver() : SQL_Driver() {
	}

	/// get api version
	int api_version() { return SQL_DRIVER_API_VERSION; }
	/// initialize driver by loading sql dynamic link library
	const char *initialize(const char *dlopen_file_spec) {
		return dlopen_file_spec?
			dlink(dlopen_file_spec):"client library column is empty";
	}

	#define throwPQerror services._throw(PQerrorMessage(conn))

	/**	connect
		@param used_only_in_connect_url
			format: @b user:pass@host[:port]|[local]/database
	*/
	void connect(
		char *used_only_in_connect_url, 
		SQL_Driver_services& services, 
		void **connection ///< output: PGconn *
		) {
		char *user=used_only_in_connect_url;
		char *host=lsplit(user, '@');
		char *db=lsplit(host, '/');
		char *pwd=lsplit(user, ':');
		char *port=lsplit(host, ':');

		PGconn *conn=PQsetdbLogin(
			strcasecmp(host, "local")==0?NULL/* local Unix domain socket */:host, port, 
			NULL, NULL, db, user, pwd);
		if(!conn)
			services._throw("PQsetdbLogin failed");
		if(PQstatus(conn)!=CONNECTION_OK)  
			throwPQerror;

		*(PGconn **)connection=conn;
		begin_transaction(services, conn);
	}
	void disconnect(void *connection) {
	    PQfinish((PGconn *)connection);
	}
	void commit(SQL_Driver_services& services, void *connection) {
		PGconn *conn=(PGconn *)connection;
		if(PGresult *res=PQexec(conn, "COMMIT"))
			PQclear(res);
		else
			throwPQerror;
		begin_transaction(services, conn);
	}
	void rollback(SQL_Driver_services& services, void *connection) {
		PGconn *conn=(PGconn *)connection;
		if(PGresult *res=PQexec(conn, "ROLLBACK"))
			PQclear(res);
		else
			throwPQerror;
		begin_transaction(services, conn);
	}

	bool ping(SQL_Driver_services&, void *connection) {
		return PQstatus((PGconn *)connection)==CONNECTION_OK;
	}

	unsigned int quote(
		SQL_Driver_services&, void *connection,
		char *to, const char *from, unsigned int length) {
		/*
			it's already UNTAINT_TIMES_BIGGER
		*/
		unsigned int result=length;
		while(length--) {
			switch(*from) {
			case '\'': // "'" -> "''"
				*to++='\'';
				break;
			case '\\': // "\" -> "\\"
				*to++='\'';
				break;
			}
			*to++=*from++;
		}
		return result;
	}
	void query(
		SQL_Driver_services& services, void *connection, 
		const char *astatement, unsigned long offset, unsigned long limit,
		SQL_Driver_query_event_handlers& handlers) {
//		_asm int 3;

		PGconn *conn=(PGconn *)connection;
		#define PQclear_throw(msg) { \
				PQclear(res); \
				services._throw(msg); \
			}						
		#define PQclear_throwPQerror PQclear_throw(PQerrorMessage(conn))

		const char *statement=preprocess_statement(services, conn,
			astatement, offset, limit);

		PGresult *res=PQexec(conn, statement);
		if(!res) 
			throwPQerror;

		switch(PQresultStatus(res)) {
		case PGRES_EMPTY_QUERY: 
			PQclear_throw("no query");
			break;
		case PGRES_COMMAND_OK:
			// empty result: insert|delete|update|...
			PQclear(res);
			return;
		case PGRES_TUPLES_OK: 
			break;	
		default:
			PQclear_throwPQerror;
			break;
		}
		
		int column_count=PQnfields(res);
		if(!column_count)
			PQclear_throw("result contains no columns");

		for(int i=0; i<column_count; i++){
			char *name=PQfname(res, i);
			size_t size=strlen(name);
			void *ptr=services.malloc(size);
			memcpy(ptr, name, size);
			handlers.add_column(ptr, size);
		}

		handlers.before_rows();

		if(unsigned long row_count=(unsigned long)PQntuples(res))
			for(unsigned long r=0; r<row_count; r++) {
				handlers.add_row();
				for(int i=0; i<column_count; i++){
					const char *cell=PQgetvalue(res, r, i);
					size_t size;
					void *ptr;
					if(PQftype(res, i)==OIDOID) {
						// ObjectID column, read object bytes

						char *error_pos=0;
						Oid oid=cell?atoi(cell):0;
						int fd=lo_open(conn, oid, INV_READ);
						if(fd>=0) {
							// seek to end
							if(lo_lseek(conn, fd, 0, SEEK_END)<0)
								PQclear_throwPQerror;
							// get size
							int size_tell=lo_tell(conn, fd);
							if(size_tell<0)
								PQclear_throwPQerror;
							// seek to begin
							if(lo_lseek(conn, fd, 0, SEEK_SET)<0)
								PQclear_throwPQerror;
							size=(size_t)size_tell;
							if(size) {
								// read 
								ptr=services.malloc(size);
								if(!lo_read_ex(conn, fd, (const char *)ptr, size_tell))
									PQclear_throw("lo_read can not read all bytes of object");
							} else
								ptr=0;
							if(lo_close(conn, fd)<0)
								PQclear_throwPQerror;
						} else
							PQclear_throwPQerror;
					} else {
						// normal column, read it normally
						size=(size_t)PQgetlength(res, r, i);
						if(size) {
							ptr=services.malloc(size);
							memcpy(ptr, cell, size);
						} else
							ptr=0;
					}
					handlers.add_row_cell(ptr, size);
				}
			}

		PQclear(res);
	}

private: // private funcs

	void begin_transaction(SQL_Driver_services& services, PGconn *conn) {
		if(PGresult *res=PQexec(conn, "BEGIN"))
			PQclear(res);
		else
			throwPQerror;
	}

	const char *preprocess_statement(SQL_Driver_services& services, PGconn *conn,
		const char *astatement, unsigned long offset, unsigned long limit) {
		size_t statement_size=strlen(astatement);

		char *result=(char *)services.malloc(statement_size
			+MAX_NUMBER*2+15 // limit # offset #
			+MAX_STRING // in case of short 'strings'
			+1);
		// offset & limit -> suffixes
		const char *o;
		if(offset || limit) {
			char *cur=result;
			memcpy(cur, astatement, statement_size); cur+=statement_size;
			if(limit)
				cur+=snprintf(cur, 7+MAX_NUMBER, " limit %u", limit);
			if(offset)
				cur+=snprintf(cur, 8+MAX_NUMBER, " offset %u", offset);
			o=result;
		} else 
			o=astatement;

		// /**xxx**/'literal' -> oid
		char *n=result;
		while(*o) {
			if(
				o[0]=='/' &&
				o[1]=='*' && 
				o[2]=='*') { // name start
				o+=3;
				while(*o)
					if(
						o[0]=='*' &&
						o[1]=='*' &&
						o[2]=='/' &&
						o[3]=='\'') { // name end
						o+=4;
						Oid oid=lo_creat(conn, INV_READ|INV_WRITE);
						if(oid==InvalidOid)
							throwPQerror;
						int fd=lo_open(conn, oid, INV_WRITE);
						if(fd>=0) {
							const char *start=o;
							bool escaped=false;
							while(*o && !(o[0]=='\'' && o[1]!='\'' && !escaped)) {
								escaped=*o=='\\' || (o[0]=='\'' && o[1]=='\'');
								if(escaped) {
									// write pending, skip "\" or "'"
									if(!lo_write_ex(conn, fd, start, o-start))
										services._throw("lo_write could not write all bytes of object (1)");
									start=++o;
								} else
									o++;
							}
							if(!lo_write_ex(conn, fd, start, o-start))
								services._throw("lo_write can not write all bytes of object (2)");
							if(lo_close(conn, fd)<0)
								throwPQerror;
						} else
							throwPQerror;
						if(*o)
							o++; // skip "'"

						n+=snprintf(n, MAX_NUMBER, "%u", oid);
						break;
					} else
						o++; // /**skip**/'xxx'
			} else
				*n++=*o++;
		}
		*n=0;

		return result;
	}

private: // lo_read/write exchancements

	bool lo_read_ex(PGconn *conn, int fd, const/*paf*/ char *buf, size_t len) {
		int size_read;
		while(len && (size_read=lo_read(conn, fd, buf, min(LO_BUFSIZE, len)))>0) {
			buf+=size_read;
			len-=size_read;									
		}
		return len==0;
	}

	bool lo_write_ex(PGconn *conn, int fd, const/*paf*/ char *buf, size_t len) {
		int size_written;
		while(len && (size_written=lo_write(conn, fd, buf, min(LO_BUFSIZE, len)))>0) {
			buf+=size_written;
			len-=size_written;									
		}
		return len==0;
	}

private: // conn client library funcs

	typedef PGconn* (*t_PQsetdbLogin)(
		const char *pghost,
		const char *pgport,
		const char *pgoptions,
		const char *pgtty,
		const char *dbName,
		const char *login,
		const char *pwd); t_PQsetdbLogin PQsetdbLogin;
	typedef void (*t_PQfinish)(PGconn *conn);  t_PQfinish PQfinish;
	typedef char *(*t_PQerrorMessage)(const PGconn* conn); t_PQerrorMessage PQerrorMessage;
	typedef ConnStatusType (*t_PQstatus)(const PGconn *conn); t_PQstatus PQstatus;
	typedef PGresult *(*t_PQexec)(PGconn *conn,
	                 const char *query); t_PQexec PQexec;
	typedef ExecStatusType (*t_PQresultStatus)(const PGresult *res); t_PQresultStatus PQresultStatus;
	typedef int (*t_PQgetlength)(const PGresult *res,
					int tup_num,
					int field_num); t_PQgetlength PQgetlength;
	typedef char* (*t_PQgetvalue)(const PGresult *res,
					 int tup_num,
					 int field_num); t_PQgetvalue PQgetvalue;
	typedef int (*t_PQntuples)(const PGresult *res); t_PQntuples PQntuples;
	typedef char *(*t_PQfname)(const PGresult *res,
						int field_index); t_PQfname PQfname;
	typedef int (*t_PQnfields)(const PGresult *res); t_PQnfields PQnfields;
	typedef void (*t_PQclear)(PGresult *res); t_PQclear PQclear;

	typedef Oid	(*t_PQftype)(const PGresult *res, int field_num); t_PQftype PQftype;

	typedef int	(*t_lo_open)(PGconn *conn, Oid lobjId, int mode); t_lo_open lo_open;
	typedef int	(*t_lo_close)(PGconn *conn, int fd); t_lo_close lo_close;
	typedef int	(*t_lo_read)(PGconn *conn, int fd, const/*paf*/ char *buf, size_t len); t_lo_read lo_read;
	typedef int	(*t_lo_write)(PGconn *conn, int fd, const/*paf*/ char *buf, size_t len); t_lo_write lo_write;
	typedef int	(*t_lo_lseek)(PGconn *conn, int fd, int offset, int whence); t_lo_lseek lo_lseek;
	typedef Oid	(*t_lo_creat)(PGconn *conn, int mode); t_lo_creat lo_creat;
	typedef int	(*t_lo_tell)(PGconn *conn, int fd); t_lo_tell lo_tell;
	typedef int	(*t_lo_unlink)(PGconn *conn, Oid lobjId); t_lo_unlink lo_unlink;
	typedef Oid	(*t_lo_import)(PGconn *conn, const char *filename); t_lo_import lo_import;
	typedef int	(*t_lo_export)(PGconn *conn, Oid lobjId, const char *filename); t_lo_export lo_export;

private: // conn client library funcs linking

	const char *dlink(const char *dlopen_file_spec) {
        lt_dlhandle handle=lt_dlopen(dlopen_file_spec);
        if(!handle)
			return "can not open the dynamic link module";

		#define DSLINK(name, action) \
			name=(t_##name)lt_dlsym(handle, #name); \
				if(!name) \
					action;

		#define DLINK(name) DSLINK(name, return "function " #name " was not found")
		
		DLINK(PQsetdbLogin);
		DLINK(PQerrorMessage);
		DLINK(PQstatus);
		DLINK(PQfinish);
		DLINK(PQgetvalue);
		DLINK(PQgetlength);
		DLINK(PQntuples);
		DLINK(PQfname);
		DLINK(PQnfields);
		DLINK(PQclear);
		DLINK(PQresultStatus);
		DLINK(PQexec);
		DLINK(PQftype);
		DLINK(lo_open);		DLINK(lo_close);
		DLINK(lo_read);		DLINK(lo_write);
		DLINK(lo_lseek);		DLINK(lo_creat);
		DLINK(lo_tell);		DLINK(lo_unlink);
		DLINK(lo_import);		DLINK(lo_export);

		return 0;
	}

};

extern "C" SQL_Driver *SQL_DRIVER_CREATE_FUNC_NAME() {
	return new PgSQL_Driver();
}

E-mail: