diff --git a/api/src/org/labkey/api/data/MaterializedQueryHelper.java b/api/src/org/labkey/api/data/MaterializedQueryHelper.java index de63430869b..f38c59c459b 100644 --- a/api/src/org/labkey/api/data/MaterializedQueryHelper.java +++ b/api/src/org/labkey/api/data/MaterializedQueryHelper.java @@ -17,6 +17,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Strings; +import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.junit.After; @@ -29,6 +30,8 @@ import org.labkey.api.test.TestWhen; import org.labkey.api.util.GUID; import org.labkey.api.util.HeartBeat; +import org.labkey.api.util.JobRunner; +import org.labkey.api.util.logging.LogHelper; import org.labkey.api.util.MemTracker; import org.labkey.api.util.UnexpectedException; @@ -42,6 +45,7 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -56,6 +60,8 @@ */ public class MaterializedQueryHelper implements CacheListener, AutoCloseable { + private static final Logger LOG = LogHelper.getLogger(MaterializedQueryHelper.class, "Materialized query helper"); + public static class Materialized { MaterializedQueryHelper _mqh; @@ -106,6 +112,20 @@ public void reset() i.stillValid(now); } + /** + * Returns true if calling getFromSql() would perform synchronous DB work (full rebuild via SELECT INTO, + * or incremental update SQL). This check is non-destructive: it does not consume invalidation state. + * Subclasses may override to check additional invalidation state (also non-destructively). + */ + public boolean needsSynchronousWork() + { + for (Invalidator i : _invalidators) + { + if (!i.peekValid(_created)) + return true; + } + return false; + } /** return false if we did not acquire the loadingLock */ boolean load(SQLFragment selectQuery, boolean isSelectInto) @@ -205,6 +225,20 @@ public CacheCheck checkValid(long createdTime) return CacheCheck.COALESCE; } public abstract boolean stillValid(long createdTime); + + /** + * Non-destructive validity check. Returns false if the entry should be considered stale without updating + * any stored state. Use this in read-only probes (e.g. {@link Materialized#needsSynchronousWork()}) to avoid + * consuming invalidation state before the actual update logic has run. + * + *

The default returns {@code true} (assume valid). Subclasses that can cheaply determine staleness without + * side effects should override this. {@link SqlInvalidator} intentionally keeps the default because a true + * non-destructive check would still require a DB round-trip and is not worth the extra query. + */ + public boolean peekValid(long createdTime) + { + return true; + } } @@ -248,6 +282,12 @@ public boolean stillValid(long createdTime) { return _maxTime != -1 && createdTime + _maxTime > HeartBeat.currentTimeMillis(); } + + @Override + public boolean peekValid(long createdTime) + { + return stillValid(createdTime); // pure time arithmetic; non-destructive + } } @@ -271,6 +311,23 @@ public boolean stillValid(long createdTime) _result.set(newResult); return false; } + + /** + * Non-destructive validity check: returns false if the supplier value has changed since the last + * {@link #stillValid} or {@link #reset} call, without updating the stored snapshot. + * Use this in read-only staleness checks (e.g. {@code needsSynchronousWork()}) to avoid consuming + * the invalidation before the incremental-update logic has had a chance to run. + */ + public boolean peekValid() + { + return Strings.CS.equals(_result.get(), _supplier.get()); + } + + @Override + public boolean peekValid(long createdTime) + { + return peekValid(); + } } @@ -304,6 +361,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) private final AtomicInteger _countGetFromSql = new AtomicInteger(); private final AtomicInteger _countSelectInto = new AtomicInteger(); private final AtomicLong _lastUsed = new AtomicLong(HeartBeat.currentTimeMillis()); + private final AtomicBoolean _backgroundTaskRunning = new AtomicBoolean(false); private boolean _closed = false; @@ -338,6 +396,69 @@ public void clearCaches() _map.clear(); } + /** + * Returns true if the global (non-transactional) materialized view is LOADED and has no pending synchronous work. + * Use this to decide whether to use the fast materialized path or fall back to direct JOINs. + */ + public boolean isReadyToUse() + { + Materialized m = _map.get(makeKey(null)); + if (m == null || m._loadingState.get() != Materialized.LoadingState.LOADED) + return false; + return !m.needsSynchronousWork(); + } + + /** + * Submits a background task to build or incrementally update the materialized view. + * Only one background task runs at a time per MQH instance (guarded by CAS on _backgroundTaskRunning). + */ + public void materializeAsync() + { + if (_backgroundTaskRunning.compareAndSet(false, true)) + { + JobRunner.getDefault().execute(() -> { + try + { + getFromSql("_bg_"); + } + catch (Exception e) + { + LOG.warn("Background materialization failed: " + _prefix, e); + } + finally + { + _backgroundTaskRunning.set(false); + } + }); + } + } + + /** + * Non-blocking variant of {@link #getFromSql(String)}: returns a SQL fragment referencing the cached materialized + * temp table only if the view is currently LOADED with no pending synchronous work, without triggering any rebuild + * or incremental-update SQL. Returns {@code null} if the view is not available (not yet built, still loading, or + * stale), so the caller can fall back immediately without blocking. + * + *

This is the safe way to use the materialized path in contexts where blocking is unacceptable and the caller + * has already decided to use the materialized view based on an earlier {@link #isReadyToUse()} check. By re-checking + * the ready condition at use time, it closes the TOCTOU window between the snapshot decision and the actual SQL + * construction. + */ + public @Nullable SQLFragment tryGetFromSqlIfLoaded(String tableAlias) + { + Materialized m = _map.get(makeKey(null)); + if (m == null || m._loadingState.get() != Materialized.LoadingState.LOADED) + return null; + if (m.needsSynchronousWork()) + return null; + _lastUsed.set(HeartBeat.currentTimeMillis()); + SQLFragment sqlf = new SQLFragment(m._fromSql); + if (!StringUtils.isBlank(tableAlias)) + sqlf.append(" ").append(tableAlias); + sqlf.addTempToken(m); + return sqlf; + } + /** * NOTE: invalidating within a transaction, may NOT force re-materialize for subsequent call within the transaction * because it could re-use the global cached result. @@ -737,5 +858,88 @@ private void _join(List list) { list.forEach((t) -> {try{t.join();}catch(InterruptedException x){/* */}}); } + + @Test + public void testPeekValidNonDestructive() + { + // Arrange: supplier starts at "A", reset stores "A" + AtomicReference supplierValue = new AtomicReference<>("A"); + SupplierInvalidator inv = new SupplierInvalidator(supplierValue::get); + inv.stillValid(0); // seed the stored snapshot to "A" + + // Verify valid when unchanged + assertTrue("peekValid should return true when supplier unchanged", inv.peekValid()); + assertTrue("peekValid(long) should return true when supplier unchanged", inv.peekValid(0)); + + // Change supplier without calling stillValid + supplierValue.set("B"); + + // peekValid should detect the change + assertFalse("peekValid should return false after supplier changes", inv.peekValid()); + assertFalse("peekValid(long) should return false after supplier changes", inv.peekValid(0)); + + // Critically: calling peekValid must NOT have consumed the invalidation. + // stillValid should still return false (snapshot is still "A", supplier is "B"). + assertFalse("stillValid must still return false after peekValid — invalidation was not consumed", inv.stillValid(0)); + } + + @Test + public void testNeedsSynchronousWorkNonDestructive() + { + DbSchema temp = DbSchema.getTemp(); + DbScope s = temp.getScope(); + AtomicReference supplierValue = new AtomicReference<>("v1"); + SQLFragment select = new SQLFragment("SELECT * FROM temp.MQH_TESTCASE"); + + try (MaterializedQueryHelper mqh = new Builder("test", s, select) + .addInvalidCheck(supplierValue::get) + .build()) + { + // Materialize once so m is LOADED and snapshot is "v1" + mqh.getFromSql("_"); + + Materialized m = mqh._map.get(mqh.makeKey(null)); + assertNotNull(m); + assertFalse("should not need synchronous work right after materialization", m.needsSynchronousWork()); + + // Invalidate by changing the supplier + supplierValue.set("v2"); + assertTrue("should need synchronous work after supplier changes", m.needsSynchronousWork()); + + // Call needsSynchronousWork a second time — must still report stale (state was not consumed) + assertTrue("needsSynchronousWork must remain true; state must not have been consumed", m.needsSynchronousWork()); + + // Consuming the state via getFromSql triggers a rebuild; afterwards no synchronous work needed + mqh.getFromSql("_"); + Materialized m2 = mqh._map.get(mqh.makeKey(null)); + assertNotNull(m2); + assertFalse("no synchronous work needed after rebuild", m2.needsSynchronousWork()); + } + } + + @Test + public void testIsReadyToUse() + { + DbSchema temp = DbSchema.getTemp(); + DbScope s = temp.getScope(); + AtomicReference supplierValue = new AtomicReference<>("v1"); + SQLFragment select = new SQLFragment("SELECT * FROM temp.MQH_TESTCASE"); + + try (MaterializedQueryHelper mqh = new Builder("test", s, select) + .addInvalidCheck(supplierValue::get) + .build()) + { + // Before any materialization: not ready + assertFalse("isReadyToUse must return false before first materialization", mqh.isReadyToUse()); + + // After materialization: ready + mqh.getFromSql("_"); + assertTrue("isReadyToUse must return true after materialization", mqh.isReadyToUse()); + + // After invalidation: not ready (needs synchronous work) + supplierValue.set("v2"); + assertFalse("isReadyToUse must return false when supplier has changed", mqh.isReadyToUse()); + } + } } } diff --git a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java index ed99b811e37..926d0fb3668 100644 --- a/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java +++ b/experiment/src/org/labkey/experiment/api/ExpMaterialTableImpl.java @@ -176,6 +176,15 @@ public class ExpMaterialTableImpl extends ExpRunItemTableImpl _uniqueIdFields; boolean _supportTableRules = true; + /** + * Snapshots the {@link MaterializedQueryHelper#isReadyToUse()} decision on the first call to + * {@link #getFromSQLExpanded} and reuses it for all subsequent calls on this instance. + * This ensures that every SQL fragment built during a single query construction uses the same + * form (materialized temp table vs. direct JOIN), preventing inconsistent SQL when a background + * materialization completes between two calls to {@code getFromSQL} on the same lookup target. + * Null means the decision has not yet been made for this instance. + */ + private volatile Boolean _mqhReadySnapshot = null; private static final Set MATERIAL_ALT_KEYS; private static final List AMOUNT_RANGE_VALIDATORS = new ArrayList<>(); @@ -1187,10 +1196,56 @@ public SQLFragment getFromSQLExpanded(String alias, Set selectedColumn boolean onlyMaterialColums = false; if (null != selectedColumns && !selectedColumns.isEmpty()) onlyMaterialColums = selectedColumns.stream().allMatch(fk -> fk.getName().equalsIgnoreCase("Folder") || null != _rootTable.getColumn(fk)); - if (!onlyMaterialColums && null != _ss && null != _ss.getTinfo() && !getExpSchema().getDbSchema().getScope().isTransactionActive()) - { - sql.append(getMaterializedSQL()); - usedMaterialized = true; + if (!onlyMaterialColums && null != _ss && null != _ss.getTinfo() + && !getExpSchema().getDbSchema().getScope().isTransactionActive()) + { + _MaterializedQueryHelper mqh = getOrCreateMQH(); + if (mqh != null) + { + // Snapshot the materialization decision on first call; reuse on all subsequent calls + // within the same TableInfo instance (i.e., the same query-construction scope). + // This prevents a race where a background build completes between two getFromSQL + // calls for the same lookup target, which would otherwise produce inconsistent SQL + // fragments (one materialized, one not) for the same table alias. + Boolean ready = _mqhReadySnapshot; + if (ready == null) + { + ready = mqh.isReadyToUse(); + _mqhReadySnapshot = ready; + } + if (ready) + { + // Fast path: snapshot said LOADED with no pending work. + // Re-check non-blockingly to close the TOCTOU window: the view may have been + // invalidated between the snapshot decision and now. + SQLFragment tempRef = mqh.tryGetFromSqlIfLoaded("_cached_view_"); + if (tempRef != null) + { + sql.append(new SQLFragment("SELECT * FROM ").append(tempRef)); + usedMaterialized = true; + } + else + { + // Became stale after the snapshot; trigger a rebuild and fall back immediately. + mqh.materializeAsync(); + sql.append(getJoinSQL(selectedColumns)); + usedMaterialized = false; + } + } + else + { + // View not built yet, or stale (incremental updates or full rebuild pending): + // trigger background work and fall back to direct JOINs immediately. + mqh.materializeAsync(); + sql.append(getJoinSQL(selectedColumns)); + usedMaterialized = false; + } + } + else + { + sql.append(getJoinSQL(selectedColumns)); + usedMaterialized = false; + } } else { @@ -1349,13 +1404,17 @@ private static InvalidationCounters getInvalidateCounters(String lsid) return _invalidationCounters.computeIfAbsent(lsid, (_) -> new InvalidationCounters()); } - /* SELECT and JOIN, does not include WHERE, same as getJoinSQL() */ - private SQLFragment getMaterializedSQL() + /** + * Gets or creates the {@code _MaterializedQueryHelper} for the current sample type from the blocking cache. + * This is cheap: it creates the MQH configuration but does NOT trigger a SELECT INTO or any incremental SQL. + * Returns null if there is no sample type ({@code _ss} is null). + */ + private _MaterializedQueryHelper getOrCreateMQH() { if (null == _ss) - return getJoinSQL(null); + return null; - var mqh = _materializedQueries.get(_ss.getLSID(), null, (unusedKey, unusedArg) -> + return _materializedQueries.get(_ss.getLSID(), null, (unusedKey, unusedArg) -> { /* NOTE: MaterializedQueryHelper does have a pattern to help with detecting schema changes. * Previously it has been used on non-provisioned tables. It might be helpful to have a pattern, @@ -1377,6 +1436,14 @@ private SQLFragment getMaterializedSQL() return (_MaterializedQueryHelper) builder.build(); }); + } + + /** + * Calls {@code mqh.getFromSql()} (which runs any pending incremental updates synchronously) and returns + * the SQL fragment referencing the temp table. Should only be called when the view is ready to use. + */ + private SQLFragment getMaterializedSQL(_MaterializedQueryHelper mqh) + { return new SQLFragment("SELECT * FROM ").append(mqh.getFromSql("_cached_view_")); } @@ -1404,6 +1471,18 @@ private static boolean isIncrementalUpdateDisabled() return _incrementalUpdateDisabled; } + /** + * Clears all cached {@code _MaterializedQueryHelper} instances. On the next request per sample type, + * {@code getOrCreateMQH()} recreates a fresh MQH, {@code isReadyToUse()} returns false, and + * {@code materializeAsync()} triggers a background rebuild. + *

+ * Intended for admin maintenance (e.g. {@code ClearMaterializedSamplesViewAction}). + */ + public static void clearAllMaterializedViews() + { + _materializedQueries.clear(); + } + /** * MaterializedQueryHelper has a built-in mechanism for tracking when a temp table needs to be recomputed. * It does not help with incremental updates (except for providing the upsert() method). @@ -1631,10 +1710,10 @@ private void appendSetFromSrc(SQLFragment sql) static class _Materialized extends MaterializedQueryHelper.Materialized { - final MaterializedQueryHelper.Invalidator incrementalInsertCheck; - final MaterializedQueryHelper.Invalidator incrementalRollupCheck; - final MaterializedQueryHelper.Invalidator incrementalDeleteCheck; - final MaterializedQueryHelper.Invalidator incrementalUpdateCheck; + final MaterializedQueryHelper.SupplierInvalidator incrementalInsertCheck; + final MaterializedQueryHelper.SupplierInvalidator incrementalRollupCheck; + final MaterializedQueryHelper.SupplierInvalidator incrementalDeleteCheck; + final MaterializedQueryHelper.SupplierInvalidator incrementalUpdateCheck; _Materialized(_MaterializedQueryHelper mqh, String tableName, String cacheKey, long created, String sql) { @@ -1657,6 +1736,21 @@ public void reset() incrementalUpdateCheck.stillValid(now); } + /** + * Returns true if any incremental counter has changed since the last reset, indicating that + * {@code incrementalUpdateBeforeSelect()} would perform synchronous DB work. + * Uses non-destructive {@code peekValid()} so the background task can still apply the updates. + */ + @Override + public boolean needsSynchronousWork() + { + if (super.needsSynchronousWork()) return true; + return !incrementalInsertCheck.peekValid() + || !incrementalDeleteCheck.peekValid() + || !incrementalUpdateCheck.peekValid() + || !incrementalRollupCheck.peekValid(); + } + Lock getLock() { return _loadingLock; @@ -2287,12 +2381,14 @@ private void mergeRows(ExpSampleType st, List> rows) throws } /** - * Trigger materialization (and any pending incremental update) as a side effect of {@link #getMaterializedSQL()}, - * then assert the cached temp table equals a fresh derivation of the join query in both directions. + * Trigger materialization (and any pending incremental update) synchronously, then assert the cached + * temp table equals a fresh derivation of the join query in both directions. */ private void assertCacheMatchesFreshDerivation(ExpMaterialTableImpl table, String lsid) { - SQLFragment cached = table.getMaterializedSQL(); // builds/refreshes the temp table via the normal read path + _MaterializedQueryHelper mqh = table.getOrCreateMQH(); + assertNotNull("MQH should not be null for a sample type table", mqh); + SQLFragment cached = table.getMaterializedSQL(mqh); // builds/refreshes the temp table via the normal read path SQLFragment fresh = table.getJoinSQL(null).append(" WHERE CpasType = ").appendValue(lsid); DbScope scope = ExperimentServiceImpl.getExpSchema().getScope(); assertEquals("Cached rows not found in fresh derivation", 0, countDiff(scope, cached, fresh)); diff --git a/experiment/src/org/labkey/experiment/controllers/exp/ExperimentController.java b/experiment/src/org/labkey/experiment/controllers/exp/ExperimentController.java index ac89d3f9d0d..962bbaa71c0 100644 --- a/experiment/src/org/labkey/experiment/controllers/exp/ExperimentController.java +++ b/experiment/src/org/labkey/experiment/controllers/exp/ExperimentController.java @@ -286,6 +286,7 @@ import org.labkey.experiment.api.ExpDataImpl; import org.labkey.experiment.api.ExpExperimentImpl; import org.labkey.experiment.api.ExpMaterialImpl; +import org.labkey.experiment.api.ExpMaterialTableImpl; import org.labkey.experiment.api.ExpProtocolApplicationImpl; import org.labkey.experiment.api.ExpProtocolImpl; import org.labkey.experiment.api.ExpRunImpl; @@ -7338,6 +7339,19 @@ public Object execute(Object form, BindException errors) } + @Marshal(Marshaller.Jackson) + @RequiresPermission(AdminPermission.class) + public static class ClearMaterializedSamplesViewAction extends MutatingApiAction + { + @Override + public Object execute(Object form, BindException errors) + { + ExpMaterialTableImpl.clearAllMaterializedViews(); + return success("Materialized sample views cleared. They will be rebuilt asynchronously on next access."); + } + } + + @Marshal(Marshaller.Jackson) @RequiresPermission(AdminPermission.class) public static class CheckDataClassesIndexedAction extends ReadOnlyApiAction