Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 204 additions & 0 deletions api/src/org/labkey/api/data/MaterializedQueryHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Strings;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.After;
Expand All @@ -29,6 +30,8 @@
import org.labkey.api.test.TestWhen;
import org.labkey.api.util.GUID;
import org.labkey.api.util.HeartBeat;
import org.labkey.api.util.JobRunner;
import org.labkey.api.util.logging.LogHelper;
import org.labkey.api.util.MemTracker;
import org.labkey.api.util.UnexpectedException;

Expand All @@ -42,6 +45,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -56,6 +60,8 @@
*/
public class MaterializedQueryHelper implements CacheListener, AutoCloseable
{
private static final Logger LOG = LogHelper.getLogger(MaterializedQueryHelper.class, "Materialized query helper");

public static class Materialized
{
MaterializedQueryHelper _mqh;
Expand Down Expand Up @@ -106,6 +112,20 @@ public void reset()
i.stillValid(now);
}

/**
* Returns true if calling getFromSql() would perform synchronous DB work (full rebuild via SELECT INTO,
* or incremental update SQL). This check is non-destructive: it does not consume invalidation state.
* Subclasses may override to check additional invalidation state (also non-destructively).
*/
public boolean needsSynchronousWork()
{
for (Invalidator i : _invalidators)
{
if (!i.peekValid(_created))
return true;
}
return false;
}

/** return false if we did not acquire the loadingLock */
boolean load(SQLFragment selectQuery, boolean isSelectInto)
Expand Down Expand Up @@ -205,6 +225,20 @@ public CacheCheck checkValid(long createdTime)
return CacheCheck.COALESCE;
}
public abstract boolean stillValid(long createdTime);

/**
* Non-destructive validity check. Returns false if the entry should be considered stale without updating
* any stored state. Use this in read-only probes (e.g. {@link Materialized#needsSynchronousWork()}) to avoid
* consuming invalidation state before the actual update logic has run.
*
* <p>The default returns {@code true} (assume valid). Subclasses that can cheaply determine staleness without
* side effects should override this. {@link SqlInvalidator} intentionally keeps the default because a true
* non-destructive check would still require a DB round-trip and is not worth the extra query.
*/
public boolean peekValid(long createdTime)
{
return true;
}
}


Expand Down Expand Up @@ -248,6 +282,12 @@ public boolean stillValid(long createdTime)
{
return _maxTime != -1 && createdTime + _maxTime > HeartBeat.currentTimeMillis();
}

@Override
public boolean peekValid(long createdTime)
{
return stillValid(createdTime); // pure time arithmetic; non-destructive
}
}


Expand All @@ -271,6 +311,23 @@ public boolean stillValid(long createdTime)
_result.set(newResult);
return false;
}

/**
* Non-destructive validity check: returns false if the supplier value has changed since the last
* {@link #stillValid} or {@link #reset} call, without updating the stored snapshot.
* Use this in read-only staleness checks (e.g. {@code needsSynchronousWork()}) to avoid consuming
* the invalidation before the incremental-update logic has had a chance to run.
*/
public boolean peekValid()
{
return Strings.CS.equals(_result.get(), _supplier.get());
}

@Override
public boolean peekValid(long createdTime)
{
return peekValid();
}
}


Expand Down Expand Up @@ -304,6 +361,7 @@ protected boolean removeEldestEntry(Map.Entry<String, Materialized> eldest)
private final AtomicInteger _countGetFromSql = new AtomicInteger();
private final AtomicInteger _countSelectInto = new AtomicInteger();
private final AtomicLong _lastUsed = new AtomicLong(HeartBeat.currentTimeMillis());
private final AtomicBoolean _backgroundTaskRunning = new AtomicBoolean(false);

private boolean _closed = false;

Expand Down Expand Up @@ -338,6 +396,69 @@ public void clearCaches()
_map.clear();
}

/**
* Returns true if the global (non-transactional) materialized view is LOADED and has no pending synchronous work.
* Use this to decide whether to use the fast materialized path or fall back to direct JOINs.
*/
public boolean isReadyToUse()
{
Materialized m = _map.get(makeKey(null));
if (m == null || m._loadingState.get() != Materialized.LoadingState.LOADED)
return false;
return !m.needsSynchronousWork();
}

/**
* Submits a background task to build or incrementally update the materialized view.
* Only one background task runs at a time per MQH instance (guarded by CAS on _backgroundTaskRunning).
*/
public void materializeAsync()
{
if (_backgroundTaskRunning.compareAndSet(false, true))
{
JobRunner.getDefault().execute(() -> {
try
{
getFromSql("_bg_");
}
catch (Exception e)
{
LOG.warn("Background materialization failed: " + _prefix, e);
}
finally
{
_backgroundTaskRunning.set(false);
}
});
}
}

/**
* Non-blocking variant of {@link #getFromSql(String)}: returns a SQL fragment referencing the cached materialized
* temp table only if the view is currently LOADED with no pending synchronous work, without triggering any rebuild
* or incremental-update SQL. Returns {@code null} if the view is not available (not yet built, still loading, or
* stale), so the caller can fall back immediately without blocking.
*
* <p>This is the safe way to use the materialized path in contexts where blocking is unacceptable and the caller
* has already decided to use the materialized view based on an earlier {@link #isReadyToUse()} check. By re-checking
* the ready condition at use time, it closes the TOCTOU window between the snapshot decision and the actual SQL
* construction.
*/
public @Nullable SQLFragment tryGetFromSqlIfLoaded(String tableAlias)
{
Materialized m = _map.get(makeKey(null));
if (m == null || m._loadingState.get() != Materialized.LoadingState.LOADED)
return null;
if (m.needsSynchronousWork())
return null;
_lastUsed.set(HeartBeat.currentTimeMillis());
SQLFragment sqlf = new SQLFragment(m._fromSql);
if (!StringUtils.isBlank(tableAlias))
sqlf.append(" ").append(tableAlias);
sqlf.addTempToken(m);
return sqlf;
}

/**
* NOTE: invalidating within a transaction, may NOT force re-materialize for subsequent call within the transaction
* because it could re-use the global cached result.
Expand Down Expand Up @@ -737,5 +858,88 @@ private void _join(List<Thread> list)
{
list.forEach((t) -> {try{t.join();}catch(InterruptedException x){/* */}});
}

@Test
public void testPeekValidNonDestructive()
{
// Arrange: supplier starts at "A", reset stores "A"
AtomicReference<String> supplierValue = new AtomicReference<>("A");
SupplierInvalidator inv = new SupplierInvalidator(supplierValue::get);
inv.stillValid(0); // seed the stored snapshot to "A"

// Verify valid when unchanged
assertTrue("peekValid should return true when supplier unchanged", inv.peekValid());
assertTrue("peekValid(long) should return true when supplier unchanged", inv.peekValid(0));

// Change supplier without calling stillValid
supplierValue.set("B");

// peekValid should detect the change
assertFalse("peekValid should return false after supplier changes", inv.peekValid());
assertFalse("peekValid(long) should return false after supplier changes", inv.peekValid(0));

// Critically: calling peekValid must NOT have consumed the invalidation.
// stillValid should still return false (snapshot is still "A", supplier is "B").
assertFalse("stillValid must still return false after peekValid — invalidation was not consumed", inv.stillValid(0));
}

@Test
public void testNeedsSynchronousWorkNonDestructive()
{
DbSchema temp = DbSchema.getTemp();
DbScope s = temp.getScope();
AtomicReference<String> supplierValue = new AtomicReference<>("v1");
SQLFragment select = new SQLFragment("SELECT * FROM temp.MQH_TESTCASE");

try (MaterializedQueryHelper mqh = new Builder("test", s, select)
.addInvalidCheck(supplierValue::get)
.build())
{
// Materialize once so m is LOADED and snapshot is "v1"
mqh.getFromSql("_");

Materialized m = mqh._map.get(mqh.makeKey(null));
assertNotNull(m);
assertFalse("should not need synchronous work right after materialization", m.needsSynchronousWork());

// Invalidate by changing the supplier
supplierValue.set("v2");
assertTrue("should need synchronous work after supplier changes", m.needsSynchronousWork());

// Call needsSynchronousWork a second time — must still report stale (state was not consumed)
assertTrue("needsSynchronousWork must remain true; state must not have been consumed", m.needsSynchronousWork());

// Consuming the state via getFromSql triggers a rebuild; afterwards no synchronous work needed
mqh.getFromSql("_");
Materialized m2 = mqh._map.get(mqh.makeKey(null));
assertNotNull(m2);
assertFalse("no synchronous work needed after rebuild", m2.needsSynchronousWork());
}
}

@Test
public void testIsReadyToUse()
{
DbSchema temp = DbSchema.getTemp();
DbScope s = temp.getScope();
AtomicReference<String> supplierValue = new AtomicReference<>("v1");
SQLFragment select = new SQLFragment("SELECT * FROM temp.MQH_TESTCASE");

try (MaterializedQueryHelper mqh = new Builder("test", s, select)
.addInvalidCheck(supplierValue::get)
.build())
{
// Before any materialization: not ready
assertFalse("isReadyToUse must return false before first materialization", mqh.isReadyToUse());

// After materialization: ready
mqh.getFromSql("_");
assertTrue("isReadyToUse must return true after materialization", mqh.isReadyToUse());

// After invalidation: not ready (needs synchronous work)
supplierValue.set("v2");
assertFalse("isReadyToUse must return false when supplier has changed", mqh.isReadyToUse());
}
}
}
}
Loading