22 #include <boost/program_options.hpp>
27 #include <drizzled/gettext.h>
28 #include <drizzled/error.h>
29 #include <drizzled/error/sql_state.h>
30 #include <drizzled/session.h>
31 #include <drizzled/internal/m_string.h>
33 #include <drizzled/util/tokenize.h>
34 #include <plugin/mysql_protocol/errmsg.h>
35 #include <plugin/mysql_protocol/mysql_protocol.h>
36 #include <plugin/mysql_protocol/mysql_password.h>
37 #include <plugin/mysql_protocol/options.h>
38 #include <drizzled/identifier.h>
39 #include <drizzled/plugin/function.h>
40 #include <drizzled/diagnostics_area.h>
41 #include <drizzled/system_variables.h>
42 #include <libdrizzle-2.0/constants.h>
44 #define MIN_HANDSHAKE_SIZE 6
45 #define PROTOCOL_VERSION 10
47 namespace po= boost::program_options;
49 using namespace drizzled;
51 namespace drizzle_plugin {
53 static const unsigned int PACKET_BUFFER_EXTRA_ALLOC= 1024;
56 static timeout_constraint connect_timeout;
57 static timeout_constraint read_timeout;
58 static timeout_constraint write_timeout;
59 static retry_constraint retry_count;
60 static buffer_constraint buffer_length;
62 static uint32_t random_seed1;
63 static uint32_t random_seed2;
64 static const uint32_t random_max= 0x3FFFFFFF;
65 static const double random_max_double= (double)0x3FFFFFFF;
67 ProtocolCounters ListenMySQLProtocol::mysql_counters;
69 void ListenMySQLProtocol::addCountersToTable()
71 counters.push_back(
new drizzled::plugin::ListenCounter(
new std::string(
"connection_count"), &getCounters().connectionCount));
72 counters.push_back(
new drizzled::plugin::ListenCounter(
new std::string(
"connected"), &getCounters().connected));
73 counters.push_back(
new drizzled::plugin::ListenCounter(
new std::string(
"failed_connections"), &getCounters().failedConnections));
76 const std::string ListenMySQLProtocol::getHost()
const
81 in_port_t ListenMySQLProtocol::getPort()
const
88 int new_fd= acceptTcp(fd);
92 ClientMySQLProtocol::ClientMySQLProtocol(
int fd,
ProtocolCounters& set_counters) :
93 _is_interactive(false),
94 counters(set_counters)
101 net.init(fd, buffer_length.get());
102 net.set_read_timeout(read_timeout.get());
103 net.set_write_timeout(write_timeout.get());
104 net.retry_count=retry_count.get();
107 ClientMySQLProtocol::~ClientMySQLProtocol()
127 bool ret= net.write(packet.ptr(), packet.length());
138 counters.connected.decrement();
144 counters.connectionCount.increment();
145 counters.connected.increment();
148 net.set_read_timeout(connect_timeout.get());
149 net.set_write_timeout(connect_timeout.get());
151 if (checkConnection())
153 if (counters.connected > counters.max_connections)
155 std::string errmsg(ER(ER_CON_COUNT_ERROR));
156 sendError(ER_CON_COUNT_ERROR, errmsg.c_str());
157 counters.failedConnections.increment();
166 sendError(session->main_da().sql_errno(), session->main_da().message());
167 counters.failedConnections.increment();
172 net.set_read_timeout(read_timeout.get());
173 net.set_write_timeout(write_timeout.get());
188 drizzleclient_net_set_read_timeout(&net,
193 packet_length= net.read();
194 if (packet_length == packet_error)
198 if(net.last_errno== ER_NET_PACKET_TOO_LARGE)
199 my_error(ER_NET_PACKET_TOO_LARGE, MYF(0));
201 sendError(session->main_da().sql_errno(), session->main_da().message());
211 *l_packet= (
char*) net.read_pos;
222 if (packet_length == 0)
225 (*l_packet)[0]= COM_SLEEP;
231 switch ((*l_packet)[0])
240 (*l_packet)[0]= COM_SHUTDOWN;
244 (*l_packet)[0]= COM_KILL;
248 (*l_packet)[0]= COM_PING;
253 (*l_packet)[0]= COM_END;
259 (*l_packet)[packet_length]=
'\0';
264 drizzleclient_net_set_read_timeout(&net, session->
variables.net_read_timeout);
293 unsigned char buff[DRIZZLE_ERRMSG_SIZE+10],*pos;
294 const char *message= NULL;
305 if (client_capabilities & CLIENT_FOUND_ROWS && session->main_da().found_rows())
306 pos=storeLength(buff+1,session->main_da().found_rows());
308 pos=storeLength(buff+1,session->main_da().affected_rows());
309 pos=storeLength(pos, session->main_da().last_insert_id());
310 int2store(pos, session->main_da().server_status());
312 tmp= min(session->main_da().total_warn_count(), (uint32_t)65535);
313 message= session->main_da().message();
317 pos=storeLength(buff+1,0);
318 pos=storeLength(pos, 0);
319 int2store(pos, session->server_status);
321 tmp= min(session->total_warn_count, (uint32_t)65535);
330 if (message && message[0])
332 size_t length= strlen(message);
333 pos=storeLength(pos,length);
334 memcpy(pos,(
unsigned char*) message,length);
337 net.write(buff, pos - buff);
364 writeEOFPacket(session->main_da().server_status(), session->main_da().total_warn_count());
368 packet.shrink(buffer_length.get());
375 unsigned char buff[2 + 1 + SQLSTATE_LENGTH + DRIZZLE_ERRMSG_SIZE];
377 assert(sql_errno != EE_OK);
378 assert(err && err[0]);
387 session->server_status&= ~SERVER_MORE_RESULTS_EXISTS;
402 int2store(buff, static_cast<uint16_t>(sql_errno));
406 unsigned char* pos= (
unsigned char*) strcpy((
char*) buff+3, error::convert_to_sqlstate(sql_errno));
407 pos+= strlen(error::convert_to_sqlstate(sql_errno));
409 char *tmp= strncpy((
char*)pos, err, DRIZZLE_ERRMSG_SIZE-1);
410 tmp+= strlen((
char*)pos);
438 unsigned char buff[80];
439 String tmp((
char*) buff,
sizeof(buff),&my_charset_bin);
441 unsigned char *row_pos= storeLength(buff, list.size());
442 (void) net.write(buff, row_pos - buff);
444 while (
Item* item=it++)
447 item->make_field(&field);
451 store(STRING_WITH_LEN(
"def"));
452 store(field.db_name);
453 store(field.table_name);
454 store(field.org_table_name);
455 store(field.col_name);
456 store(field.org_col_name);
457 packet.realloc(packet.length()+12);
460 char* pos= (
char*) packet.ptr()+packet.length();
463 int2store(pos, field.charsetnr);
464 int4store(pos+2, field.length);
471 case DRIZZLE_TYPE_LONG:
472 pos[6]= DRIZZLE_COLUMN_TYPE_LONG;
475 case DRIZZLE_TYPE_DOUBLE:
476 pos[6]= DRIZZLE_COLUMN_TYPE_DOUBLE;
479 case DRIZZLE_TYPE_NULL:
480 pos[6]= DRIZZLE_COLUMN_TYPE_NULL;
483 case DRIZZLE_TYPE_TIMESTAMP:
484 pos[6]= DRIZZLE_COLUMN_TYPE_TIMESTAMP;
487 case DRIZZLE_TYPE_LONGLONG:
488 pos[6]= DRIZZLE_COLUMN_TYPE_LONGLONG;
491 case DRIZZLE_TYPE_DATETIME:
492 pos[6]= DRIZZLE_COLUMN_TYPE_DATETIME;
495 case DRIZZLE_TYPE_TIME:
496 pos[6]= DRIZZLE_COLUMN_TYPE_TIME;
499 case DRIZZLE_TYPE_DATE:
500 pos[6]= DRIZZLE_COLUMN_TYPE_DATE;
503 case DRIZZLE_TYPE_VARCHAR:
504 pos[6]= DRIZZLE_COLUMN_TYPE_VARCHAR;
507 case DRIZZLE_TYPE_MICROTIME:
508 pos[6]= DRIZZLE_COLUMN_TYPE_VARCHAR;
511 case DRIZZLE_TYPE_UUID:
512 pos[6]= DRIZZLE_COLUMN_TYPE_VARCHAR;
515 case DRIZZLE_TYPE_IPV6:
516 pos[6]= DRIZZLE_COLUMN_TYPE_VARCHAR;
519 case DRIZZLE_TYPE_BOOLEAN:
520 pos[6]= DRIZZLE_COLUMN_TYPE_TINY;
523 case DRIZZLE_TYPE_DECIMAL:
524 pos[6]= (char)DRIZZLE_COLUMN_TYPE_NEWDECIMAL;
527 case DRIZZLE_TYPE_ENUM:
528 pos[6]= (char)DRIZZLE_COLUMN_TYPE_ENUM;
531 case DRIZZLE_TYPE_BLOB:
532 pos[6]= (char)DRIZZLE_COLUMN_TYPE_BLOB;
539 pos[6]= field.type + 1;
542 int2store(pos+7,field.flags);
543 pos[9]= (char) field.decimals;
548 packet.length((uint32_t) (pos - packet.ptr()));
558 writeEOFPacket(session->server_status, session->total_warn_count);
561 void ClientMySQLProtocol::store(
Field *from)
565 if (from->type() == DRIZZLE_TYPE_BOOLEAN)
567 return store(from->val_int());
570 char buff[MAX_FIELD_WIDTH];
571 String str(buff,
sizeof(buff), &my_charset_bin);
573 from->val_str_internal(&str);
575 netStoreData(str.ptr(), str.length());
578 void ClientMySQLProtocol::store()
582 packet.append(buff,
sizeof(buff), PACKET_BUFFER_EXTRA_ALLOC);
585 void ClientMySQLProtocol::store(int32_t from)
588 netStoreData(buff, internal::int10_to_str(from, buff, -10) - buff);
591 void ClientMySQLProtocol::store(uint32_t from)
594 netStoreData(buff, internal::int10_to_str(from, buff, 10) - buff);
597 void ClientMySQLProtocol::store(int64_t from)
600 netStoreData(buff, internal::int64_t10_to_str(from, buff, -10) - buff);
603 void ClientMySQLProtocol::store(uint64_t from)
606 netStoreData(buff, internal::int64_t10_to_str(from, buff, 10) - buff);
609 void ClientMySQLProtocol::store(
double from, uint32_t decimals,
String *buffer)
611 buffer->set_real(from, decimals, session->charset());
612 netStoreData(buffer->ptr(), buffer->length());
615 void ClientMySQLProtocol::store(
const char *from,
size_t length)
617 netStoreData(from, length);
620 bool ClientMySQLProtocol::wasAborted()
622 return net.error_ && net.vio != 0;
625 bool ClientMySQLProtocol::haveError()
627 return net.error_ || net.vio == 0;
630 bool ClientMySQLProtocol::checkConnection()
632 char scramble[SCRAMBLE_LENGTH];
633 identifier::user::mptr user_identifier= identifier::User::make_shared();
635 makeScramble(scramble);
642 if (net.peer_addr(ip, NI_MAXHOST, peer_port))
644 my_error(ER_BAD_HOST_ERROR, MYF(0), ip);
648 user_identifier->setAddress(ip);
654 uint32_t server_capabilites;
657 char buff[SERVER_VERSION_LENGTH + SCRAMBLE_LENGTH + 64];
659 server_capabilites= CLIENT_BASIC_FLAGS | CLIENT_PROTOCOL_MYSQL41;
662 server_capabilites|= CLIENT_COMPRESS;
665 end= buff + strlen(PANDORA_RELEASE_VERSION);
666 if ((end - buff) >= SERVER_VERSION_LENGTH)
667 end= buff + (SERVER_VERSION_LENGTH - 1);
668 memcpy(buff, PANDORA_RELEASE_VERSION, end - buff);
672 int4store((
unsigned char*) end, session->
variables.pseudo_thread_id);
676 memcpy(end, scramble, SCRAMBLE_LENGTH_323);
677 end+= SCRAMBLE_LENGTH_323;
680 int2store(end, server_capabilites);
682 end[2]=(char) default_charset_info->number;
683 int2store(end+3, session->server_status);
684 memset(end+5, 0, 13);
688 memcpy(end, scramble + SCRAMBLE_LENGTH_323, SCRAMBLE_LENGTH - SCRAMBLE_LENGTH_323);
689 end+= (SCRAMBLE_LENGTH - SCRAMBLE_LENGTH_323);
694 || (pkt_len= net.read()) == packet_error
695 || pkt_len < MIN_HANDSHAKE_SIZE)
697 my_error(ER_HANDSHAKE_ERROR, MYF(0), user_identifier->address().c_str());
701 packet.alloc(buffer_length.get());
703 client_capabilities= uint2korr(net.read_pos);
704 if (not (client_capabilities & CLIENT_PROTOCOL_MYSQL41))
706 my_error(ER_HANDSHAKE_ERROR, MYF(0), user_identifier->address().c_str());
710 client_capabilities|= ((uint32_t) uint2korr(net.read_pos + 2)) << 16;
712 end= (
char*) net.read_pos + 32;
718 client_capabilities&= server_capabilites;
720 if (end >= (
char*) net.read_pos + pkt_len + 2)
722 my_error(ER_HANDSHAKE_ERROR, MYF(0), user_identifier->address().c_str());
727 char *passwd= strchr(user,
'\0')+1;
728 uint32_t user_len= passwd - user - 1;
738 if (client_capabilities & CLIENT_SECURE_CONNECTION &&
739 passwd < (
char *) net.read_pos + pkt_len)
741 passwd_len= (
unsigned char)(*passwd++);
742 if (passwd_len > 0 and client_capabilities & CLIENT_CAPABILITIES_PLUGIN_AUTH)
744 user_identifier->setPasswordType(identifier::User::PLAIN_TEXT);
746 else if (passwd_len > 0)
748 user_identifier->setPasswordType(identifier::User::MYSQL_HASH);
749 user_identifier->setPasswordContext(scramble, SCRAMBLE_LENGTH);
757 if (client_capabilities & CLIENT_CONNECT_WITH_DB &&
758 passwd < (
char *) net.read_pos + pkt_len)
760 l_db= l_db + passwd_len + 1;
768 uint32_t db_len= l_db ? strlen(l_db) : 0;
770 if (passwd + passwd_len + db_len > (
char *) net.read_pos + pkt_len)
772 my_error(ER_HANDSHAKE_ERROR, MYF(0), user_identifier->address().c_str());
777 if (user_len > 1 && user[0] ==
'\'' && user[user_len - 1] ==
'\'')
784 if (client_capabilities & CLIENT_INTERACTIVE)
786 _is_interactive=
true;
789 if (client_capabilities & CLIENT_CAPABILITIES_PLUGIN_AUTH)
791 passwd_len= strlen(passwd);
794 user_identifier->setUser(user);
795 session->setUser(user_identifier);
797 return session->
checkUser(
string(passwd, passwd_len),
string(l_db ? l_db :
""));
801 void ClientMySQLProtocol::netStoreData(
const void* from,
size_t length)
803 size_t packet_length= packet.length();
808 if (packet_length+9+length > packet.alloced_length())
809 packet.realloc(packet_length+9+length);
810 unsigned char *to= storeLength((
unsigned char*) packet.ptr()+packet_length, length);
811 memcpy(to,from,length);
812 packet.length((
size_t) (to+length-(
unsigned char*) packet.ptr()));
822 unsigned char buff[5];
827 uint32_t tmp= min(total_warn_count, (uint32_t)65535);
828 buff[0]= DRIZZLE_PROTOCOL_NO_MORE_DATA;
829 int2store(buff+1, tmp);
836 server_status&= ~SERVER_MORE_RESULTS_EXISTS;
837 int2store(buff + 3, server_status);
854 unsigned char *ClientMySQLProtocol::storeLength(
unsigned char *buffer, uint64_t length)
858 *buffer= (
unsigned char) length;
865 int2store(buffer, (uint32_t) length);
868 if (length < 16777216)
871 int3store(buffer, (uint32_t) length);
875 int8store(buffer, length);
879 void ClientMySQLProtocol::makeScramble(
char *scramble)
882 random_seed1= (random_seed1 * 3 + random_seed2) % random_max;
883 random_seed2= (random_seed1 + random_seed2 + 33) % random_max;
884 uint32_t seed=
static_cast<uint32_t
>((
static_cast<double>(random_seed1) / random_max_double) * 0xffffffff);
887 uint32_t pointer_seed;
888 memcpy(&pointer_seed, &pointer, 4);
889 uint32_t random1= (seed + pointer_seed) % random_max;
890 uint32_t random2= (seed + session->
variables.pseudo_thread_id + net.vio->
get_fd()) % random_max;
892 for (
char *end= scramble + SCRAMBLE_LENGTH; scramble != end; scramble++)
894 random1= (random1 * 3 + random2) % random_max;
895 random2= (random1 + random2 + 33) % random_max;
896 *scramble=
static_cast<char>((
static_cast<double>(random1) / random_max_double) * 94 + 33);
903 time_t seed_time= time(NULL);
904 random_seed1= seed_time % random_max;
905 random_seed2= (seed_time / 2) % random_max;
909 context.add(
new plugin::Create_function<MySQLPassword>(MySQLPasswordName));
911 ListenMySQLProtocol* listen_obj=
new ListenMySQLProtocol(
"mysql_protocol", vm[
"bind-address"].as<std::string>());
912 listen_obj->addCountersToTable();
913 context.add(listen_obj);
921 context.registerVariable(
new sys_var_uint32_t_ptr(
"max-connections", &ListenMySQLProtocol::mysql_counters.max_connections));
929 po::value<port_constraint>(&port)->default_value(3306),
930 _(
"Port number to use for connection or 0 for default to with MySQL "
932 context(
"connect-timeout",
933 po::value<timeout_constraint>(&connect_timeout)->default_value(10),
934 _(
"Connect Timeout."));
935 context(
"read-timeout",
936 po::value<timeout_constraint>(&read_timeout)->default_value(30),
938 context(
"write-timeout",
939 po::value<timeout_constraint>(&write_timeout)->default_value(60),
940 _(
"Write Timeout."));
941 context(
"retry-count",
942 po::value<retry_constraint>(&retry_count)->default_value(10),
944 context(
"buffer-length",
945 po::value<buffer_constraint>(&buffer_length)->default_value(16384),
946 _(
"Buffer length."));
947 context(
"bind-address",
948 po::value<string>()->default_value(
"localhost"),
949 _(
"Address to bind to."));
950 context(
"max-connections",
951 po::value<uint32_t>(&ListenMySQLProtocol::mysql_counters.max_connections)->default_value(1000),
952 _(
"Maximum simultaneous connections."));
957 DRIZZLE_DECLARE_PLUGIN
963 N_(
"MySQL network protocol"),
965 drizzle_plugin::init,
967 drizzle_plugin::init_options
969 DRIZZLE_DECLARE_PLUGIN_END;
void writeEOFPacket(uint32_t server_status, uint32_t total_warn_count)
virtual bool authenticate()
bool checkUser(const std::string &passwd, const std::string &db)
An Proxy Wrapper around boost::program_options::variables_map.
bool can_overwrite_status
virtual void sendFields(drizzled::List< drizzled::Item > &)
virtual bool readCommand(char **packet, uint32_t &packet_length)
virtual void sendError(const drizzled::error_t sql_errno, const char *err)
virtual bool isConnected()
virtual int getFileDescriptor()
drizzle_system_variables & variables