Skip to content

Stage 3: IUniTaskAsyncEnumerable adapter for incremental byte/text streams#297

Open
MaxHeimbrock wants to merge 1 commit into
max/yield-instruction-unitask-extensionfrom
max/yield-instruction-async-enumerable
Open

Stage 3: IUniTaskAsyncEnumerable adapter for incremental byte/text streams#297
MaxHeimbrock wants to merge 1 commit into
max/yield-instruction-unitask-extensionfrom
max/yield-instruction-async-enumerable

Conversation

@MaxHeimbrock

Copy link
Copy Markdown
Contributor

Summary

  • Adds a single generic extension AsAsyncEnumerable<TChunk>(this ReadIncrementalInstructionBase<TChunk>, CancellationToken) in the gated LiveKit.UniTask assembly, so ByteStreamReader/TextStreamReader incremental reads can be consumed with await foreach (returns IUniTaskAsyncEnumerable<byte[]> / <string>).
  • Builds on Stage 1's StreamYieldInstruction awaiter and Stage 2's AsUniTask. The loop mirrors the coroutine consumer's observable behavior: await a chunk → yield it → re-check IsEos after yielding (since Reset() is disallowed past EoS) → Reset() for the next chunk.
var reader = await OpenByteStream(...);
try {
    await foreach (var chunk in reader.ReadIncremental().AsAsyncEnumerable(ct))
        Process(chunk);
} catch (StreamError e) {
    Debug.LogError(e.Message);
}

Design decisions

  • Error model: on EoS carrying a StreamError, the enumerable throws it (idiomatic for await foreach — the one place the UniTask surface throws rather than exposing IsError). Note: once an error is set, LatestChunk itself throws, so a chunk and an error never surface from the same step (a chunk delivered before the error is observed normally; the error then ends iteration).
  • Cancellation: OperationCanceledException, abandon-awaiter semantics (consistent with Stage 2).
  • Scope: byte/text incremental readers only. DataTrack.ReadFrameInstruction is out of scope (no awaiter, no Reset()) — possible follow-up.
  • No sample migration: the Meet sample reads no incremental streams; Stage 3 is adapter + deterministic tests.

Supporting changes (core LiveKit asm, behavior-preserving)

  • Widen StreamYieldInstruction.IsCurrentReadDone getter and ReadIncrementalInstructionBase<T>.LatestChunk to public so the separate LiveKit.UniTask assembly can drive the loop. Both already have public equivalents on the sibling DataTrack.ReadFrameInstruction.
  • Add UniTask.Linq reference to the runtime + test UniTask asmdefs (source of UniTaskAsyncEnumerable.Create / IUniTaskAsyncEnumerable).
  • Extend InternalsVisibleTo to PlayModeTests.UniTask so the deterministic tests can construct a synthetic reader via the public FfiHandle seam the EditMode tests already use.

Testing

Tests/PlayMode/UniTask/StreamUniTaskTests.cs (gated by LIVEKIT_UNITASK), using a synthetic TestIncrementalReader : ReadIncrementalInstructionBase<string> with PushChunk/PushEos — the same FFI-free seam as DataStreamIncrementalReadTests:

  • delivers chunks in order then stops at EoS (manual enumeration, interleaved push/pull)
  • delivers the final chunk when EoS is already set, then stops
  • delivers a chunk then throws StreamError on a subsequent error EoS
  • already-cancelled token throws OperationCanceledException, observes nothing

Test plan

  • Scripts~/run_unity.sh build macos — clean compile.
  • Scripts~/run_unity.sh test -m PlayMode -f "AsAsyncEnumerable|AsUniTask|GetAwaiter" — 8 passed, 0 failed (4 new stream tests + inherited Stage 1/2).
  • Scripts~/run_unity.sh test -m EditMode — 70 passed, 2 skipped (confirms the visibility widening didn't regress DataStreamIncrementalReadTests).

Targeting

Base branch is max/yield-instruction-unitask-extension (Stage 2, PR #290), so this diff shows only the Stage 3 surface.

🤖 Generated with Claude Code

Stage 3 of the UniTask migration. Exposes ByteStreamReader/TextStreamReader
incremental reads as IUniTaskAsyncEnumerable<TChunk> so chunks can be consumed
with `await foreach`, building on Stage 1's StreamYieldInstruction awaiter and
Stage 2's AsUniTask.

A single generic extension AsAsyncEnumerable<TChunk>(this
ReadIncrementalInstructionBase<TChunk>) covers both byte[] and string readers.
The loop mirrors the coroutine consumer's observable behavior: await a chunk,
yield it, re-check IsEos AFTER yielding (Reset() is disallowed past EoS), and
Reset() for the next chunk. On EoS carrying a StreamError the enumerable throws
that error — idiomatic for await foreach, the one place the UniTask surface
throws rather than exposing IsError. Cancellation surfaces as
OperationCanceledException with abandon-awaiter semantics.

To let the separate LiveKit.UniTask assembly drive the loop, two members are
widened to public (both already public on the sibling DataTrack.ReadFrameInstruction,
behavior-preserving): StreamYieldInstruction.IsCurrentReadDone getter and
ReadIncrementalInstructionBase<T>.LatestChunk. The runtime and test UniTask
asmdefs gain a UniTask.Linq reference (source of UniTaskAsyncEnumerable.Create /
IUniTaskAsyncEnumerable), and InternalsVisibleTo is extended to the
PlayModeTests.UniTask assembly so the deterministic tests can construct a
synthetic reader (the same FfiHandle-based seam the EditMode tests use).

DataTrack frame streaming is intentionally out of scope (its ReadFrameInstruction
has no awaiter and no Reset) — a possible follow-up.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant