Skip to content

Commit

Permalink
[CONJS-290] possible ECONRESET when executing batch #281
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Apr 30, 2024
1 parent cef85db commit 0167c18
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 10 deletions.
10 changes: 7 additions & 3 deletions lib/io/packet-output-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class PacketOutputStream {
this.bufContainDataAfterMark = false;
this.cmdLength = 0;
this.buf = Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
this.maxAllowedPacket = opts.maxAllowedPacket || 4194304;
this.maxAllowedPacket = opts.maxAllowedPacket || 16777216;
this.maxPacketLength = Math.min(MAX_BUFFER_SIZE, this.maxAllowedPacket + 4);

this.changeEncoding(this.opts.collation ? this.opts.collation : Collations.fromIndex(224));
Expand Down Expand Up @@ -85,6 +85,10 @@ class PacketOutputStream {
newCapacity = LARGE_BUFFER_SIZE;
} else if (len + this.pos < BIG_BUFFER_SIZE) {
newCapacity = BIG_BUFFER_SIZE;
} else if (this.bufContainDataAfterMark) {
// special case, for bulk, when bunch of parameter doesn't fit in 16Mb packet
// this save bunch of encoded parameter, sending parameter until mark, then resending data after mark
newCapacity = len + this.pos;
} else {
newCapacity = MAX_BUFFER_SIZE;
}
Expand All @@ -98,7 +102,7 @@ class PacketOutputStream {
if (len + this.pos <= this.buf.length) {
return;
}
this.growBuffer(len);
return this.growBuffer(len);
}
}

Expand Down Expand Up @@ -627,7 +631,7 @@ class PacketOutputStream {
* @throws Error if query has not to be sent.
*/
checkMaxAllowedLength(length, info) {
if (this.cmdLength + length >= this.maxAllowedPacket) {
if (this.opts.maxAllowedPacket && this.cmdLength + length >= this.maxAllowedPacket) {
// launch exception only if no packet has been sent.
return Errors.createError(
`query size (${this.cmdLength + length}) is >= to max_allowed_packet (${this.maxAllowedPacket})`,
Expand Down
74 changes: 67 additions & 7 deletions test/integration/test-big-query.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,83 @@ describe('Big query', function () {
if (!shareConn.info.isMariaDB()) this.skip();

this.timeout(30000); //can take some time
const conn = await base.createConnection({ maxAllowedSize: maxAllowedSize });
const conn = await base.createConnection({ maxAllowedPacket: maxAllowedSize});
conn.query('DROP TABLE IF EXISTS bigParameterError');
conn.query('CREATE TABLE bigParameterError (b longblob)');
await conn.query('FLUSH TABLES');

conn.beginTransaction();
try {
conn.beginTransaction();
const param = Buffer.alloc(maxAllowedSize / 2, '0').toString();
await conn.batch('insert into bigParameterError(b) values(?)', [[param], ['b'], [param]]);
} finally {
await conn.end();
}

const conn2 = await base.createConnection();
try {
const param = Buffer.alloc(maxAllowedSize, '0').toString();
await conn2.batch('insert into bigParameterError(b) values(?)', [[param], ['b'], [param]]);
throw new Error('must have thrown an error');
} catch (err) {
console.log(err);
assert.equal(err.code, 'ER_MAX_ALLOWED_PACKET');
assert.isTrue(err.message.includes('is >= to max_allowed_packet'));
assert.equal(err.sqlState, 'HY000');
assert.equal(err.code, 'ECONNRESET');
} finally {
await conn.end();
await conn2.end();
}
});

it('bunch parameter bigger than 16M', async function () {
if (maxAllowedSize < 32 * 1024 * 1024) this.skip();
if (!shareConn.info.isMariaDB()) this.skip();

this.timeout(60000); //can take some time
const mb = 1024 * 1024;
await sendBigParamBunch(10 * mb, 10 * mb);
await sendBigParamBunch(10 * mb, 20 * mb);
await sendBigParamBunch(20 * mb, 10 * mb);
if (maxAllowedSize < 40 * 1024 * 1024) {
await sendBigParamBunch(33 * mb, 20 * mb);
}
});

async function sendBigParamBunch(firstLen, secondLen) {
const conn = await base.createConnection({ maxAllowedSize: maxAllowedSize });
conn.query('DROP TABLE IF EXISTS bigParameter2');
conn.query('CREATE TABLE bigParameter2 (a longtext, b longtext)');
await conn.query('FLUSH TABLES');
try {
conn.beginTransaction();
const param1 = Buffer.alloc(firstLen, 'a').toString();
const param2 = Buffer.alloc(secondLen, 'c').toString();
await conn.batch('insert into bigParameter2(a,b) values(?, ?)', [
['q', 's'],
[param1, param2],
['b', 'n']
]);
await conn.batch('insert into bigParameter2(a,b) values(?, ?)', [
[param1, param2],
['q2', 's2'],
[param1, 's3']
]);
const rows = await conn.query('SELECT * from bigParameter2');

assert.deepEqual(rows[0], { a: 'q', b: 's' });
assert.deepEqual(rows[1], {
a: param1,
b: param2
});
assert.deepEqual(rows[2], { a: 'b', b: 'n' });
assert.deepEqual(rows[3], {
a: param1,
b: param2
});
assert.deepEqual(rows[4], { a: 'q2', b: 's2' });
assert.deepEqual(rows[5], {
a: param1,
b: 's3'
});
} finally {
await conn.end();
}
}
});

0 comments on commit 0167c18

Please sign in to comment.