Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .coveragerc_omit
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ omit =
src/vitessce/wrappers.py
src/vitessce/repr.py
src/vitessce/utils.py
src/vitessce/sync_store.py
src/vitessce/data_utils/anndata.py
src/vitessce/data_utils/ome.py
src/vitessce/data_utils/entities.py
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "vitessce"
version = "3.9.1"
version = "3.9.2"
authors = [
{ name="Mark Keller", email="mark_keller@hms.harvard.edu" },
]
Expand Down
68 changes: 68 additions & 0 deletions src/vitessce/sync_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Copied from zarr-python
# Reference: https://github.com/zarr-developers/zarr-python/blob/fe229107f9915f05817f7a664d3550695ff9ca44/src/zarr/testing/stateful.py#L438

import builtins
from typing import Any
import zarr
from zarr.abc.store import Store
from zarr.core.buffer import Buffer, BufferPrototype


class SyncStoreWrapper(zarr.core.sync.SyncMixin):
def __init__(self, store: Store) -> None:
"""Synchronous Store wrapper

This class holds synchronous methods that map to async methods of Store classes.
The synchronous wrapper is needed because hypothesis' stateful testing infra does
not support asyncio so we redefine sync versions of the Store API.
https://github.com/HypothesisWorks/hypothesis/issues/3712#issuecomment-1668999041
"""
self.store = store

@property
def read_only(self) -> bool:
return self.store.read_only

def set(self, key: str, data_buffer: Buffer) -> None:
return self._sync(self.store.set(key, data_buffer))

def list(self) -> builtins.list[str]:
return self._sync_iter(self.store.list())

def get(self, key: str, prototype: BufferPrototype, **kwargs) -> Buffer | None:
return self._sync(self.store.get(key, prototype=prototype, **kwargs))

def get_partial_values(
self, key_ranges: builtins.list[Any], prototype: BufferPrototype
) -> builtins.list[Buffer | None]:
return self._sync(self.store.get_partial_values(prototype=prototype, key_ranges=key_ranges))

def delete(self, path: str) -> None:
return self._sync(self.store.delete(path))

def is_empty(self, prefix: str) -> bool:
return self._sync(self.store.is_empty(prefix=prefix))

def clear(self) -> None:
return self._sync(self.store.clear())

def exists(self, key: str) -> bool:
return self._sync(self.store.exists(key))

def list_dir(self, prefix: str) -> None:
raise NotImplementedError

def list_prefix(self, prefix: str) -> None:
raise NotImplementedError

@property
def supports_listing(self) -> bool:
return self.store.supports_listing

@property
def supports_writes(self) -> bool:
return self.store.supports_writes

@property
def supports_deletes(self) -> bool:
return self.store.supports_deletes
39 changes: 16 additions & 23 deletions src/vitessce/widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import anywidget
from traitlets import Unicode, Dict, List, Int, Bool

import asyncio
from zarr.abc.store import RangeByteRequest, SuffixByteRequest
from zarr.core.buffer.core import default_buffer_prototype
from .sync_store import SyncStoreWrapper


MAX_PORT_TRIES = 1000
Expand Down Expand Up @@ -912,11 +912,11 @@ def close(self):
super().close()

# @anywidget.experimental.command
async def _zarr_get(self, params, buffers):
def _zarr_get(self, params, buffers):
[store_url, key] = params
store = self._stores[store_url]
store = SyncStoreWrapper(self._stores[store_url])
try:
result = await store.get(key.lstrip("/"), prototype=default_buffer_prototype())
result = store.get(key.lstrip("/"), prototype=default_buffer_prototype())
if result is None:
buffers = []
else:
Expand All @@ -927,9 +927,9 @@ async def _zarr_get(self, params, buffers):
return {"success": len(buffers) == 1}, buffers

# @anywidget.experimental.command
async def _zarr_get_range(self, params, buffers):
def _zarr_get_range(self, params, buffers):
[store_url, key, range_query] = params
store = self._stores[store_url]
store = SyncStoreWrapper(self._stores[store_url])
try:
range_param = None
# Reference: https://github.com/manzt/zarrita.js/blob/f63a2521e2b46b22aa26af4146822e4d827dff83/packages/%40zarrita-storage/src/types.ts#L3
Expand All @@ -943,7 +943,7 @@ async def _zarr_get_range(self, params, buffers):
else:
raise ValueError(f"Invalid range query: {range_query}. Must contain either 'suffixLength' or both 'offset' and 'length'.")

result = await store.get(key, byte_range=range_param, prototype=default_buffer_prototype())
result = store.get(key, byte_range=range_param, prototype=default_buffer_prototype())
if result is None:
buffers = []
else:
Expand All @@ -954,16 +954,16 @@ async def _zarr_get_range(self, params, buffers):
return {"success": len(buffers) == 1}, buffers

# @anywidget.experimental.command
async def _zarr_get_multi(self, params_arr, buffers):
def _zarr_get_multi(self, params_arr, buffers):
# This variant of _zarr_get and _zarr_get_range supports batching.
result_dicts = []
result_buffers = []
for params in params_arr:
result_dict, result_buffer_arr = {}, []
if len(params) == 2:
result_dict, result_buffer_arr = await self._zarr_get(params, buffers)
result_dict, result_buffer_arr = self._zarr_get(params, buffers)
elif len(params) == 3:
result_dict, result_buffer_arr = await self._zarr_get_range(params, buffers)
result_dict, result_buffer_arr = self._zarr_get_range(params, buffers)
else:
raise ValueError("Expected params to have len 2 or 3 in _zarr_get_multi")
if result_dict["success"] and len(result_buffer_arr) == 1:
Expand All @@ -986,28 +986,21 @@ def _handle_msg(self, msg: dict) -> None:
if content.get("kind") != "anywidget-command":
super()._handle_msg(msg)
return
try:
loop = asyncio.get_event_loop()
except RuntimeError:
return
if loop.is_running():
loop.create_task(self._dispatch_command(content, buffers))
else:
loop.run_until_complete(self._dispatch_command(content, buffers))
self._dispatch_command(content, buffers)

async def _dispatch_command(self, msg: dict, buffers: list[bytes]) -> None:
def _dispatch_command(self, msg: dict, buffers: list[bytes]) -> None:
name = msg.get("name")
params = msg.get("msg")
msg_id = msg.get("id")
try:
if name == "_zarr_get":
response, result_buffers = await self._zarr_get(params, buffers)
response, result_buffers = self._zarr_get(params, buffers)
elif name == "_zarr_get_range":
response, result_buffers = await self._zarr_get_range(params, buffers)
response, result_buffers = self._zarr_get_range(params, buffers)
elif name == "_zarr_get_multi":
response, result_buffers = await self._zarr_get_multi(params, buffers)
response, result_buffers = self._zarr_get_multi(params, buffers)
elif name == "_plugin_command":
response, result_buffers = await self._plugin_command(params, buffers)
response, result_buffers = self._plugin_command(params, buffers)
else:
return
except Exception as exc: # noqa: BLE001
Expand Down
Loading