fix: add stream name to every request when connection is created duri… · googleapis/java-bigquerystorage@c53a77c
@@ -87,7 +87,7 @@ public void setUp() throws Exception {
87878888@Test
8989public void testMultiplexedAppendSuccess() throws Exception {
90-try (ConnectionWorker connectionWorker = createConnectionWorker()) {
90+try (ConnectionWorker connectionWorker = createMultiplexedConnectionWorker()) {
9191long appendCount = 20;
9292for (long i = 0; i < appendCount; i++) {
9393testBigQueryWrite.addResponse(createAppendResponse(i));
@@ -150,7 +150,7 @@ public void testMultiplexedAppendSuccess() throws Exception {
150150151151// We will get the request as the pattern of:
152152// (writer_stream: t1, schema: t1)
153-// (writer_stream: _, schema: _)
153+// (writer_stream: t1, schema: _)
154154// (writer_stream: t2, schema: t2) -> multiplexing entered.
155155// (writer_stream: t2, schema: _)
156156// (writer_stream: t1, schema: t1)
@@ -164,11 +164,7 @@ public void testMultiplexedAppendSuccess() throws Exception {
164164break;
165165case 1:
166166// The write stream is empty until we enter multiplexing.
167-if (i == 1) {
168-assertThat(serverRequest.getWriteStream()).isEmpty();
169- } else {
170-assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
171- }
167+assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
172168// Schema is empty if not at the first request after table switch.
173169assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
174170break;
@@ -198,7 +194,7 @@ public void testMultiplexedAppendSuccess() throws Exception {
198194199195@Test
200196public void testAppendInSameStream_switchSchema() throws Exception {
201-try (ConnectionWorker connectionWorker = createConnectionWorker()) {
197+try (ConnectionWorker connectionWorker = createMultiplexedConnectionWorker()) {
202198long appendCount = 20;
203199for (long i = 0; i < appendCount; i++) {
204200testBigQueryWrite.addResponse(createAppendResponse(i));
@@ -279,26 +275,20 @@ public void testAppendInSameStream_switchSchema() throws Exception {
279275280276// We will get the request as the pattern of:
281277// (writer_stream: t1, schema: schema1)
282-// (writer_stream: _, schema: _)
278+// (writer_stream: t1, schema: _)
283279// (writer_stream: t1, schema: schema3)
284280// (writer_stream: t1, schema: _)
285281// (writer_stream: t1, schema: schema1)
286282// (writer_stream: t1, schema: _)
287283switch (i % 4) {
288284case 0:
289-if (i == 0) {
290-assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
291- }
285+assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
292286assertThat(
293287serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())
294288 .isEqualTo("foo");
295289break;
296290case 1:
297-if (i == 1) {
298-assertThat(serverRequest.getWriteStream()).isEmpty();
299- } else {
300-assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
301- }
291+assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
302292// Schema is empty if not at the first request after table switch.
303293assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
304294break;
@@ -346,7 +336,8 @@ public void testAppendButInflightQueueFull() throws Exception {
346336client.getSettings(),
347337retrySettings,
348338/*enableRequestProfiler=*/ false,
349-/*enableOpenTelemetry=*/ false);
339+/*enableOpenTelemetry=*/ false,
340+/*isMultiplexing=*/ false);
350341testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
351342ConnectionWorker.setMaxInflightQueueWaitTime(500);
352343@@ -405,7 +396,8 @@ public void testThrowExceptionWhileWithinAppendLoop() throws Exception {
405396client.getSettings(),
406397retrySettings,
407398/*enableRequestProfiler=*/ false,
408-/*enableOpenTelemetry=*/ false);
399+/*enableOpenTelemetry=*/ false,
400+/*isMultiplexing=*/ true);
409401testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(1));
410402ConnectionWorker.setMaxInflightQueueWaitTime(500);
411403@@ -476,7 +468,8 @@ public void testLocationMismatch() throws Exception {
476468client.getSettings(),
477469retrySettings,
478470/*enableRequestProfiler=*/ false,
479-/*enableOpenTelemetry=*/ false);
471+/*enableOpenTelemetry=*/ false,
472+/*isMultiplexing=*/ true);
480473StatusRuntimeException ex =
481474assertThrows(
482475StatusRuntimeException.class,
@@ -510,7 +503,8 @@ public void testStreamNameMismatch() throws Exception {
510503client.getSettings(),
511504retrySettings,
512505/*enableRequestProfiler=*/ false,
513-/*enableOpenTelemetry=*/ false);
506+/*enableOpenTelemetry=*/ false,
507+/*isMultiplexing=*/ true);
514508StatusRuntimeException ex =
515509assertThrows(
516510StatusRuntimeException.class,
@@ -539,13 +533,13 @@ private AppendRowsResponse createAppendResponse(long offset) {
539533 .build();
540534 }
541535542-private ConnectionWorker createConnectionWorker() throws IOException {
536+private ConnectionWorker createMultiplexedConnectionWorker() throws IOException {
543537// By default use only the first table as table reference.
544-return createConnectionWorker(
538+return createMultiplexedConnectionWorker(
545539TEST_STREAM_1, TEST_TRACE_ID, 100, 1000, java.time.Duration.ofSeconds(5));
546540 }
547541548-private ConnectionWorker createConnectionWorker(
542+private ConnectionWorker createMultiplexedConnectionWorker(
549543String streamName,
550544String traceId,
551545long maxRequests,
@@ -565,7 +559,8 @@ private ConnectionWorker createConnectionWorker(
565559client.getSettings(),
566560retrySettings,
567561/*enableRequestProfiler=*/ false,
568-/*enableOpenTelemetry=*/ false);
562+/*enableOpenTelemetry=*/ false,
563+/*isMultiplexing=*/ true);
569564 }
570565571566private ProtoSchema createProtoSchema(String protoName) {
@@ -663,7 +658,8 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
663658client.getSettings(),
664659retrySettings,
665660/*enableRequestProfiler=*/ false,
666-/*enableOpenTelemetry=*/ false);
661+/*enableOpenTelemetry=*/ false,
662+/*isMultiplexing*/ false);
667663org.threeten.bp.Duration durationSleep = org.threeten.bp.Duration.ofSeconds(2);
668664testBigQueryWrite.setResponseSleep(durationSleep);
669665@@ -740,7 +736,8 @@ public void testLongTimeIdleWontFail() throws Exception {
740736client.getSettings(),
741737retrySettings,
742738/*enableRequestProfiler=*/ false,
743-/*enableOpenTelemetry=*/ false);
739+/*enableOpenTelemetry=*/ false,
740+/*isMultiplexing*/ false);
744741745742long appendCount = 10;
746743for (int i = 0; i < appendCount * 2; i++) {
@@ -787,7 +784,8 @@ private void exerciseOpenTelemetryAttributesWithStreamNames(String streamName, S
787784client.getSettings(),
788785retrySettings,
789786/*enableRequestProfiler=*/ false,
790-/*enableOpenTelemetry=*/ true);
787+/*enableOpenTelemetry=*/ true,
788+/*isMultiplexing*/ false);
791789792790Attributes attributes = connectionWorker.getTelemetryAttributes();
793791String attributesTableId = attributes.get(TelemetryMetrics.telemetryKeyTableId);
@@ -829,7 +827,8 @@ void exerciseOpenTelemetryAttributesWithTraceId(
829827client.getSettings(),
830828retrySettings,
831829/*enableRequestProfiler=*/ false,
832-/*enableOpenTelemetry=*/ true);
830+/*enableOpenTelemetry=*/ true,
831+/*isMultiplexing*/ false);
833832834833Attributes attributes = connectionWorker.getTelemetryAttributes();
835834checkOpenTelemetryTraceIdAttribute(attributes, 0, expectedField1);