fix: add stream name to every request when connection is created duri… · googleapis/java-bigquerystorage@c53a77c

@@ -87,7 +87,7 @@ public void setUp() throws Exception {

87878888

@Test

8989

public void testMultiplexedAppendSuccess() throws Exception {

90-

try (ConnectionWorker connectionWorker = createConnectionWorker()) {

90+

try (ConnectionWorker connectionWorker = createMultiplexedConnectionWorker()) {

9191

long appendCount = 20;

9292

for (long i = 0; i < appendCount; i++) {

9393

testBigQueryWrite.addResponse(createAppendResponse(i));

@@ -150,7 +150,7 @@ public void testMultiplexedAppendSuccess() throws Exception {

150150151151

// We will get the request as the pattern of:

152152

// (writer_stream: t1, schema: t1)

153-

// (writer_stream: _, schema: _)

153+

// (writer_stream: t1, schema: _)

154154

// (writer_stream: t2, schema: t2) -> multiplexing entered.

155155

// (writer_stream: t2, schema: _)

156156

// (writer_stream: t1, schema: t1)

@@ -164,11 +164,7 @@ public void testMultiplexedAppendSuccess() throws Exception {

164164

break;

165165

case 1:

166166

// The write stream is empty until we enter multiplexing.

167-

if (i == 1) {

168-

assertThat(serverRequest.getWriteStream()).isEmpty();

169-

} else {

170-

assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);

171-

}

167+

assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);

172168

// Schema is empty if not at the first request after table switch.

173169

assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();

174170

break;

@@ -198,7 +194,7 @@ public void testMultiplexedAppendSuccess() throws Exception {

198194199195

@Test

200196

public void testAppendInSameStream_switchSchema() throws Exception {

201-

try (ConnectionWorker connectionWorker = createConnectionWorker()) {

197+

try (ConnectionWorker connectionWorker = createMultiplexedConnectionWorker()) {

202198

long appendCount = 20;

203199

for (long i = 0; i < appendCount; i++) {

204200

testBigQueryWrite.addResponse(createAppendResponse(i));

@@ -279,26 +275,20 @@ public void testAppendInSameStream_switchSchema() throws Exception {

279275280276

// We will get the request as the pattern of:

281277

// (writer_stream: t1, schema: schema1)

282-

// (writer_stream: _, schema: _)

278+

// (writer_stream: t1, schema: _)

283279

// (writer_stream: t1, schema: schema3)

284280

// (writer_stream: t1, schema: _)

285281

// (writer_stream: t1, schema: schema1)

286282

// (writer_stream: t1, schema: _)

287283

switch (i % 4) {

288284

case 0:

289-

if (i == 0) {

290-

assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);

291-

}

285+

assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);

292286

assertThat(

293287

serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())

294288

.isEqualTo("foo");

295289

break;

296290

case 1:

297-

if (i == 1) {

298-

assertThat(serverRequest.getWriteStream()).isEmpty();

299-

} else {

300-

assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);

301-

}

291+

assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);

302292

// Schema is empty if not at the first request after table switch.

303293

assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();

304294

break;

@@ -346,7 +336,8 @@ public void testAppendButInflightQueueFull() throws Exception {

346336

client.getSettings(),

347337

retrySettings,

348338

/*enableRequestProfiler=*/ false,

349-

/*enableOpenTelemetry=*/ false);

339+

/*enableOpenTelemetry=*/ false,

340+

/*isMultiplexing=*/ false);

350341

testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));

351342

ConnectionWorker.setMaxInflightQueueWaitTime(500);

352343

@@ -405,7 +396,8 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception {

405396

client.getSettings(),

406397

retrySettings,

407398

/*enableRequestProfiler=*/ false,

408-

/*enableOpenTelemetry=*/ false);

399+

/*enableOpenTelemetry=*/ false,

400+

/*isMultiplexing=*/ true);

409401

testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));

410402

ConnectionWorker.setMaxInflightQueueWaitTime(500);

411403

@@ -476,7 +468,8 @@ public void testLocationMismatch() throws Exception {

476468

client.getSettings(),

477469

retrySettings,

478470

/*enableRequestProfiler=*/ false,

479-

/*enableOpenTelemetry=*/ false);

471+

/*enableOpenTelemetry=*/ false,

472+

/*isMultiplexing=*/ true);

480473

StatusRuntimeException ex =

481474

assertThrows(

482475

StatusRuntimeException.class,

@@ -510,7 +503,8 @@ public void testStreamNameMismatch() throws Exception {

510503

client.getSettings(),

511504

retrySettings,

512505

/*enableRequestProfiler=*/ false,

513-

/*enableOpenTelemetry=*/ false);

506+

/*enableOpenTelemetry=*/ false,

507+

/*isMultiplexing=*/ true);

514508

StatusRuntimeException ex =

515509

assertThrows(

516510

StatusRuntimeException.class,

@@ -539,13 +533,13 @@ private AppendRowsResponse createAppendResponse(long offset) {

539533

.build();

540534

}

541535542-

private ConnectionWorker createConnectionWorker() throws IOException {

536+

private ConnectionWorker createMultiplexedConnectionWorker() throws IOException {

543537

// By default use only the first table as table reference.

544-

return createConnectionWorker(

538+

return createMultiplexedConnectionWorker(

545539

TEST_STREAM_1, TEST_TRACE_ID, 100, 1000, java.time.Duration.ofSeconds(5));

546540

}

547541548-

private ConnectionWorker createConnectionWorker(

542+

private ConnectionWorker createMultiplexedConnectionWorker(

549543

String streamName,

550544

String traceId,

551545

long maxRequests,

@@ -565,7 +559,8 @@ private ConnectionWorker createConnectionWorker(

565559

client.getSettings(),

566560

retrySettings,

567561

/*enableRequestProfiler=*/ false,

568-

/*enableOpenTelemetry=*/ false);

562+

/*enableOpenTelemetry=*/ false,

563+

/*isMultiplexing=*/ true);

569564

}

570565571566

private ProtoSchema createProtoSchema(String protoName) {

@@ -663,7 +658,8 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E

663658

client.getSettings(),

664659

retrySettings,

665660

/*enableRequestProfiler=*/ false,

666-

/*enableOpenTelemetry=*/ false);

661+

/*enableOpenTelemetry=*/ false,

662+

/*isMultiplexing*/ false);

667663

org.threeten.bp.Duration durationSleep = org.threeten.bp.Duration.ofSeconds(2);

668664

testBigQueryWrite.setResponseSleep(durationSleep);

669665

@@ -740,7 +736,8 @@ public void testLongTimeIdleWontFail() throws Exception {

740736

client.getSettings(),

741737

retrySettings,

742738

/*enableRequestProfiler=*/ false,

743-

/*enableOpenTelemetry=*/ false);

739+

/*enableOpenTelemetry=*/ false,

740+

/*isMultiplexing*/ false);

744741745742

long appendCount = 10;

746743

for (int i = 0; i < appendCount * 2; i++) {

@@ -787,7 +784,8 @@ private void exerciseOpenTelemetryAttributesWithStreamNames(String streamName, S

787784

client.getSettings(),

788785

retrySettings,

789786

/*enableRequestProfiler=*/ false,

790-

/*enableOpenTelemetry=*/ true);

787+

/*enableOpenTelemetry=*/ true,

788+

/*isMultiplexing*/ false);

791789792790

Attributes attributes = connectionWorker.getTelemetryAttributes();

793791

String attributesTableId = attributes.get(TelemetryMetrics.telemetryKeyTableId);

@@ -829,7 +827,8 @@ void exerciseOpenTelemetryAttributesWithTraceId(

829827

client.getSettings(),

830828

retrySettings,

831829

/*enableRequestProfiler=*/ false,

832-

/*enableOpenTelemetry=*/ true);

830+

/*enableOpenTelemetry=*/ true,

831+

/*isMultiplexing*/ false);

833832834833

Attributes attributes = connectionWorker.getTelemetryAttributes();

835834

checkOpenTelemetryTraceIdAttribute(attributes, 0, expectedField1);