diff --git a/core/src/main/java/com/google/adk/flows/llmflows/Functions.java b/core/src/main/java/com/google/adk/flows/llmflows/Functions.java index a952d602d..4fc1667d9 100644 --- a/core/src/main/java/com/google/adk/flows/llmflows/Functions.java +++ b/core/src/main/java/com/google/adk/flows/llmflows/Functions.java @@ -45,6 +45,7 @@ import io.opentelemetry.context.Scope; import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Single; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.functions.Function; @@ -199,15 +200,16 @@ public static Maybe handleFunctionCalls( }); }; - Flowable functionResponseEventsFlowable; + Observable functionResponseEventsObservable; if (invocationContext.runConfig().toolExecutionMode() == ToolExecutionMode.SEQUENTIAL) { - functionResponseEventsFlowable = - Flowable.fromIterable(functionCalls).concatMapMaybe(functionCallMapper); + functionResponseEventsObservable = + Observable.fromIterable(functionCalls).concatMapMaybe(functionCallMapper); } else { - functionResponseEventsFlowable = - Flowable.fromIterable(functionCalls).flatMapMaybe(functionCallMapper); + functionResponseEventsObservable = + Observable.fromIterable(functionCalls) + .concatMapEager(call -> functionCallMapper.apply(call).toObservable()); } - return functionResponseEventsFlowable + return functionResponseEventsObservable .toList() .flatMapMaybe( events -> { @@ -310,18 +312,18 @@ public static Maybe handleFunctionCallsLive( }); }; - Flowable responseEventsFlowable; + Observable responseEventsObservable; if (invocationContext.runConfig().toolExecutionMode() == ToolExecutionMode.SEQUENTIAL) { - responseEventsFlowable = - Flowable.fromIterable(functionCalls).concatMapMaybe(functionCallMapper); - + responseEventsObservable = + Observable.fromIterable(functionCalls).concatMapMaybe(functionCallMapper); } else { - responseEventsFlowable = - Flowable.fromIterable(functionCalls).flatMapMaybe(functionCallMapper); + responseEventsObservable = + Observable.fromIterable(functionCalls) + .concatMapEager(call -> functionCallMapper.apply(call).toObservable()); } - return responseEventsFlowable + return responseEventsObservable .toList() .flatMapMaybe( events -> { diff --git a/core/src/test/java/com/google/adk/flows/llmflows/FunctionsTest.java b/core/src/test/java/com/google/adk/flows/llmflows/FunctionsTest.java index d880d7d85..b9d99d39c 100644 --- a/core/src/test/java/com/google/adk/flows/llmflows/FunctionsTest.java +++ b/core/src/test/java/com/google/adk/flows/llmflows/FunctionsTest.java @@ -167,7 +167,8 @@ public void handleFunctionCalls_multipleFunctionCalls() { .name("echo_tool") .response(ImmutableMap.of("result", args2)) .build()) - .build()); + .build()) + .inOrder(); } @Test @@ -229,7 +230,9 @@ public void populateClientFunctionCallId_withExistingId_noChange() { .build())) .build(); - Functions.populateClientFunctionCallId(event); - assertThat(event).isEqualTo(event); + Event eventCopy = Event.fromJson(event.toJson()); + Functions.populateClientFunctionCallId(eventCopy); + + assertThat(eventCopy).isEqualTo(event); } }