From 7d3bfee6d51efe00c44553530178387f90d9cd43 Mon Sep 17 00:00:00 2001 From: cgivre Date: Fri, 26 Jun 2026 11:22:45 -0400 Subject: [PATCH] DRILL-8239: Convert convert_fromJSON UDF to EVF Rewrite the convert_fromJSON UDFs to use the EVF JsonLoader (ResultSetLoader) instead of the legacy JsonReader, mirroring the HTTP storage plugin UDFs. JsonConverterUtils builds the loader from either the system JSON options or the explicit allTextMode/readNumbersAsDouble arguments, and centralises the per-row conversion. To preserve the full convert_fromJSON contract, the EVF complex-writer support in ProjectRecordBatch is extended: * Multiple complex-writer functions per project list. addLoader now keeps a list of (loader, output-column) pairs -- captured at codegen in DrillComplexWriterFuncHolder -- so each loader's output lands in the column reserved for it (fixes SELECT convert_from(a) m1, convert_from(b) m2 and cases that project columns before/after the function). * Top-level scalars and arrays. The UDF wraps each input value in a single marker field so the record-oriented loader reads {scalar, array, object} uniformly; ProjectRecordBatch unwraps that marker column by transferring it directly (preserving the value's own type), and otherwise wraps the loader's columns in a map (the HTTP-UDF behaviour). * Per-output-batch lifecycle. Loaders are re-started before each batch, fixing the "Unexpected state: HARVESTED" failure on multi-row/multi-batch input. * Null/empty input writes an aligned (null) row so the loader row count matches the surrounding batch. --- .../expr/fn/DrillComplexWriterFuncHolder.java | 2 +- .../expr/fn/impl/conv/JsonConvertFrom.java | 298 +++++++----------- .../expr/fn/impl/conv/JsonConverterUtils.java | 180 +++++++++++ .../impl/project/ProjectBatchBuilder.java | 7 +- .../impl/project/ProjectRecordBatch.java | 111 +++++-- 5 files changed, 387 insertions(+), 211 deletions(-) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java index 4f6491c0920..8b468019bae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java @@ -87,7 +87,7 @@ protected HoldingContainer generateEvalBody(ClassGenerator classGenerator, Ho classGenerator.getEvalBlock().add(complexWriter.invoke("setPosition").arg(classGenerator.getMappingSet().getValueWriteIndex())); sub.decl(classGenerator.getModel()._ref(ComplexWriter.class), getReturnValue().getName(), complexWriter); } else { - classGenerator.getSetupBlock().add(projBatch.invoke("addLoader").arg(rsLoader)); + classGenerator.getSetupBlock().add(projBatch.invoke("addLoader").arg(rsLoader).arg(JExpr.lit(refName))); sub.decl(classGenerator.getModel()._ref(ResultSetLoader.class), getReturnValue().getName(), rsLoader); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java index 61e64a55dcc..a64ce5b2f18 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.expr.fn.impl.conv; -import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; @@ -30,11 +29,13 @@ import org.apache.drill.exec.expr.holders.NullableVarCharHolder; import org.apache.drill.exec.expr.holders.VarBinaryHolder; import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import javax.inject.Inject; +@SuppressWarnings("unused") public class JsonConvertFrom { private JsonConvertFrom() { @@ -46,39 +47,31 @@ public static class ConvertFromJson implements DrillSimpleFunc { @Param VarBinaryHolder in; - @Inject - DrillBuf buffer; + @Output + ComplexWriter writer; @Inject OptionManager options; + @Inject + ResultSetLoader rsLoader; + @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output - ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE); - boolean readNumbersAsDouble = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE); - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, 1, in.start, in.end, in.buffer, false, false, false); } } @@ -94,79 +87,66 @@ public static class ConvertFromJsonWithArgs implements DrillSimpleFunc { @Param BitHolder readNumbersAsDoubleHolder; + @Output + ComplexWriter writer; + + @Inject + OptionManager options; + @Inject - DrillBuf buffer; + ResultSetLoader rsLoader; @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output - ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = allTextModeHolder.value == 1; - boolean readNumbersAsDouble = readNumbersAsDoubleHolder.value == 1; - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, 1, in.start, in.end, in.buffer, true, + allTextModeHolder.value == 1, readNumbersAsDoubleHolder.value == 1); } } - @FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, isRandom = true) public static class ConvertFromJsonVarchar implements DrillSimpleFunc { @Param VarCharHolder in; - @Inject - DrillBuf buffer; + @Output + ComplexWriter writer; @Inject OptionManager options; + @Inject + ResultSetLoader rsLoader; + @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output - ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE); - boolean readNumbersAsDouble = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE); - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, 1, in.start, in.end, in.buffer, false, false, false); } } @@ -182,36 +162,32 @@ public static class ConvertFromJsonVarcharWithConfig implements DrillSimpleFunc @Param BitHolder readNumbersAsDoubleHolder; + @Output + ComplexWriter writer; + @Inject - DrillBuf buffer; + OptionManager options; + + @Inject + ResultSetLoader rsLoader; @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output - ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = allTextModeHolder.value == 1; - boolean readNumbersAsDouble = readNumbersAsDoubleHolder.value == 1; - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, 1, in.start, in.end, in.buffer, true, + allTextModeHolder.value == 1, readNumbersAsDoubleHolder.value == 1); } } @@ -221,47 +197,31 @@ public static class ConvertFromJsonNullableInput implements DrillSimpleFunc { @Param NullableVarBinaryHolder in; - @Inject - DrillBuf buffer; + @Output + ComplexWriter writer; @Inject OptionManager options; + @Inject + ResultSetLoader rsLoader; + @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output - ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE); - boolean readNumbersAsDouble = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE); - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - if (in.isSet == 0) { - // Return empty map - org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap(); - mapWriter.start(); - mapWriter.end(); - return; - } - - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, in.isSet, in.start, in.end, in.buffer, false, false, false); } } @@ -277,44 +237,32 @@ public static class ConvertFromJsonNullableInputWithArgs implements DrillSimpleF @Param BitHolder readNumbersAsDoubleHolder; + @Output + ComplexWriter writer; + + @Inject + OptionManager options; + @Inject - DrillBuf buffer; + ResultSetLoader rsLoader; @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output - ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = allTextModeHolder.value == 1; - boolean readNumbersAsDouble = readNumbersAsDoubleHolder.value == 1; - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - if (in.isSet == 0) { - // Return empty map - org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap(); - mapWriter.start(); - mapWriter.end(); - return; - } - - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, in.isSet, in.start, in.end, in.buffer, true, + allTextModeHolder.value == 1, readNumbersAsDoubleHolder.value == 1); } } @@ -324,46 +272,31 @@ public static class ConvertFromJsonVarcharNullableInput implements DrillSimpleFu @Param NullableVarCharHolder in; - @Inject - DrillBuf buffer; + @Output + ComplexWriter writer; @Inject OptionManager options; + @Inject + ResultSetLoader rsLoader; + @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE); - boolean readNumbersAsDouble = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE); - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - if (in.isSet == 0) { - // Return empty map - org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap(); - mapWriter.start(); - mapWriter.end(); - return; - } - - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, in.isSet, in.start, in.end, in.buffer, false, false, false); } } @@ -379,43 +312,32 @@ public static class ConvertFromJsonVarcharNullableInputWithConfigs implements Dr @Param BitHolder readNumbersAsDoubleHolder; + @Output + ComplexWriter writer; + @Inject - DrillBuf buffer; + OptionManager options; + + @Inject + ResultSetLoader rsLoader; @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = allTextModeHolder.value == 1; - boolean readNumbersAsDouble = readNumbersAsDoubleHolder.value == 1; - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - if (in.isSet == 0) { - // Return empty map - org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap(); - mapWriter.start(); - mapWriter.end(); - return; - } - - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, in.isSet, in.start, in.end, in.buffer, true, + allTextModeHolder.value == 1, readNumbersAsDoubleHolder.value == 1); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java new file mode 100644 index 00000000000..d6ccaaf4d02 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.expr.fn.impl.conv; + + +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions; +import org.apache.drill.exec.vector.complex.fn.DrillBufInputStream; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JsonConverterUtils { + + private static final Logger logger = LoggerFactory.getLogger(JsonConverterUtils.class); + + /** + * Field name used to wrap the input value so that the record-oriented JSON + * loader can read top-level scalars and arrays (not just objects) uniformly. + * The single resulting column carries the converted value and is recognised + * and unwrapped by {@code ProjectRecordBatch} (which transfers it directly to + * the output column instead of wrapping the loader's columns in a map). + */ + public static final String WRAP_FIELD = "drill_json_value_wrapper"; + + private static final byte[] WRAP_PREFIX = + ("{\"" + WRAP_FIELD + "\":").getBytes(StandardCharsets.UTF_8); + private static final byte[] WRAP_SUFFIX = "}".getBytes(StandardCharsets.UTF_8); + + private JsonConverterUtils() { + } + + /** + * Creates a {@link JsonLoaderImpl} for use in JSON conversion UDFs, using the + * system JSON options from the {@link OptionManager}. + * + * @param rsLoader The {@link ResultSetLoader} used in the UDF + * @param options The {@link OptionManager} used in the UDF. This is used to extract the global JSON options + * @param stream An input stream containing the input JSON data + * @return A {@link JsonLoaderImpl} for use in the UDF. + */ + public static JsonLoaderImpl createJsonLoader(ResultSetLoader rsLoader, + OptionManager options, + ClosingStreamIterator stream) { + JsonLoaderBuilder jsonLoaderBuilder = new JsonLoaderBuilder() + .resultSetLoader(rsLoader) + .standardOptions(options) + .fromStream(() -> stream); + + return (JsonLoaderImpl) jsonLoaderBuilder.build(); + } + + /** + * Creates a {@link JsonLoaderImpl} for use in JSON conversion UDFs, overriding the + * {@code allTextMode} and {@code readNumbersAsDouble} options with the values supplied + * as function arguments. Remaining options are taken from the system JSON options. + * + * @param rsLoader The {@link ResultSetLoader} used in the UDF + * @param options The {@link OptionManager} used in the UDF. This is used to extract the global JSON options + * @param stream An input stream containing the input JSON data + * @param allTextMode Whether to read all scalars as text + * @param readNumbersAsDouble Whether to read all numbers as doubles + * @return A {@link JsonLoaderImpl} for use in the UDF. + */ + public static JsonLoaderImpl createJsonLoader(ResultSetLoader rsLoader, + OptionManager options, + ClosingStreamIterator stream, + boolean allTextMode, + boolean readNumbersAsDouble) { + JsonLoaderOptions jsonOptions = new JsonLoaderOptions(options); + jsonOptions.allTextMode = allTextMode; + jsonOptions.readNumbersAsDouble = readNumbersAsDouble; + + JsonLoaderBuilder jsonLoaderBuilder = new JsonLoaderBuilder() + .resultSetLoader(rsLoader) + .options(jsonOptions) + .fromStream(() -> stream); + + return (JsonLoaderImpl) jsonLoaderBuilder.build(); + } + + /** + * Converts a single JSON value (one row of UDF input) into the result set loader. + * + *

Exactly one row is always written to the loader -- even for null/empty + * input -- so that the loader's row count stays aligned with the surrounding + * project batch. The value is wrapped in a single-field object so that + * top-level scalars and arrays parse the same way as objects.

+ * + * @param rsLoader the result set loader injected into the UDF + * @param jsonLoader the lazily created JSON loader, or {@code null} on the first call + * @param options the system JSON options + * @param stream the streaming iterator bound to the JSON loader + * @param isSet 0 if the input value is null, 1 otherwise + * @param start start offset of the value in {@code buffer} + * @param end end offset of the value in {@code buffer} + * @param buffer buffer holding the input value + * @param useArgs whether to honour the explicit allTextMode/readNumbersAsDouble arguments + * @param allTextMode explicit all-text-mode argument (used only when {@code useArgs}) + * @param readNumbersAsDouble explicit read-numbers-as-double argument (used only when {@code useArgs}) + * @return the JSON loader (created on first use), to be cached by the caller + */ + public static JsonLoaderImpl convertJson(ResultSetLoader rsLoader, + JsonLoaderImpl jsonLoader, + OptionManager options, + ClosingStreamIterator stream, + int isSet, int start, int end, DrillBuf buffer, + boolean useArgs, + boolean allTextMode, + boolean readNumbersAsDouble) { + RowSetLoader rowWriter = rsLoader.writer(); + rowWriter.start(); + // For null or empty input emit an (unset) row to keep the row count aligned. + if (isSet == 0 || start == end) { + rowWriter.save(); + return jsonLoader; + } + + try { + stream.setValue(getWrappedJsonStream(start, end, buffer)); + if (jsonLoader == null) { + jsonLoader = useArgs + ? createJsonLoader(rsLoader, options, stream, allTextMode, readNumbersAsDouble) + : createJsonLoader(rsLoader, options, stream); + } + // next() reads the single wrapped record; always save the row so the + // count stays aligned even for an empty document. + jsonLoader.parser().next(); + rowWriter.save(); + } catch (Exception e) { + throw UserException.dataReadError(e) + .message("Error while reading JSON. ") + .addContext(e.getMessage()) + .build(logger); + } + return jsonLoader; + } + + /** + * Wraps the raw input value in a single-field JSON object ({@code {"json": }}) + * so that the record-oriented JSON loader accepts top-level scalars and arrays. + */ + private static InputStream getWrappedJsonStream(int start, int end, DrillBuf buffer) { + InputStream value = DrillBufInputStream.getStream(start, end, buffer); + return new SequenceInputStream(Collections.enumeration(Arrays.asList( + new ByteArrayInputStream(WRAP_PREFIX), + value, + new ByteArrayInputStream(WRAP_SUFFIX)))); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java index 056f4c09a75..96bfb378ab4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java @@ -95,9 +95,10 @@ public ValueVectorWriteExpression addOutputVector(String name, LogicalExpression @Override public void addComplexField(FieldReference ref) { - if (projectBatch.rsLoader == null) { - initComplexWriters(); - } + // The result set loaders are not registered until the generated projector + // setup runs (after this point), so the complex-writer list is always + // (re)initialised here; it is simply left unused when loaders are present. + initComplexWriters(); if (projectBatch.complexFieldReferencesList == null) { projectBatch.complexFieldReferencesList = Lists.newArrayList(); } else { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index a23227a275e..bce6f191ba6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -28,12 +28,14 @@ import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.resultSet.ResultSetLoader; import org.apache.drill.exec.record.AbstractSingleRecordBatch; +import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.SimpleRecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils; import org.apache.drill.exec.util.record.RecordBatchStats; import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.vector.AllocationHelper; @@ -54,7 +56,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch { protected List allocationVectors; @Deprecated // use new writer rsLoader protected List complexWriters; - protected ResultSetLoader rsLoader; + // One result set loader per complex-writer (EVF) function in the project list. + // rsLoaderRefs holds each loader's output column name, captured at codegen, so + // the harvested column lands in the slot reserved for that function. + protected List rsLoaders; + private List rsLoaderRefs; + // True once the loaders have been harvested and must be re-started before the + // next output batch is written. + private boolean loadersNeedStart; protected List complexFieldReferencesList; protected ProjectMemoryManager memoryManager; private Projector projector; @@ -111,7 +120,7 @@ protected IterOutcome doWork() { memoryManager.update(); if (first && incomingRecordCount == 0) { - if (!CollectionUtils.isEmpty(complexWriters) || rsLoader != null ) { + if (!CollectionUtils.isEmpty(complexWriters) || !CollectionUtils.isEmpty(rsLoaders)) { IterOutcome next = null; while (incomingRecordCount == 0) { if (getLastKnownOutcome() == EMIT) { @@ -146,7 +155,7 @@ protected IterOutcome doWork() { } } - if ((!CollectionUtils.isEmpty(complexWriters) || rsLoader != null) && getLastKnownOutcome() == EMIT) { + if ((!CollectionUtils.isEmpty(complexWriters) || !CollectionUtils.isEmpty(rsLoaders)) && getLastKnownOutcome() == EMIT) { throw UserException.unsupportedError() .message("Currently functions producing complex types as output are not " + "supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " + @@ -162,6 +171,7 @@ protected IterOutcome doWork() { memoryManager.getOutputRowCount(), incomingRecordCount, incoming, this); doAlloc(maxOuputRecordCount); + startLoaderBatchIfNeeded(); long projectStartTime = System.currentTimeMillis(); int outputRecords = projector.projectRecords(incoming, 0, maxOuputRecordCount, 0); long projectEndTime = System.currentTimeMillis(); @@ -178,14 +188,8 @@ protected IterOutcome doWork() { } // In case of complex writer expression, vectors would be added to batch run-time. // We have to re-build the schema. - if (rsLoader != null) { - MapVector map = container.addOrGet(container.getLast().getField().getName(), Types.required(TypeProtos.MinorType.MAP), MapVector.class); - map.setMapValueCount(recordCount); - for (VectorWrapper vectorWrapper : rsLoader.harvest()) { - ValueVector valueVector = vectorWrapper.getValueVector(); - map.putChild(valueVector.getField().getName(), valueVector); - } - container.buildSchema(SelectionVectorMode.NONE); + if (!CollectionUtils.isEmpty(rsLoaders)) { + harvestLoaders(); } else if (!CollectionUtils.isEmpty(complexWriters)) { container.buildSchema(SelectionVectorMode.NONE); } @@ -202,6 +206,7 @@ private void handleRemainder() { assert memoryManager.incomingBatch() == incoming; int recordsToProcess = Math.min(remainingRecordCount, memoryManager.getOutputRowCount()); doAlloc(recordsToProcess); + startLoaderBatchIfNeeded(); logger.trace("handleRemainder: remaining RC {}, toProcess {}, remainder index {}, incoming {}, Project {}", remainingRecordCount, recordsToProcess, remainderIndex, incoming, this); @@ -225,7 +230,9 @@ private void handleRemainder() { } // In case of complex writer expression, vectors would be added to batch run-time. // We have to re-build the schema. - if (!CollectionUtils.isEmpty(complexWriters) || rsLoader != null) { + if (!CollectionUtils.isEmpty(rsLoaders)) { + harvestLoaders(); + } else if (!CollectionUtils.isEmpty(complexWriters)) { container.buildSchema(SelectionVectorMode.NONE); } @@ -239,8 +246,65 @@ public void addComplexWriter(ComplexWriter writer) { complexWriters.add(writer); } - public void addLoader(ResultSetLoader loader) { - rsLoader = loader; + public void addLoader(ResultSetLoader loader, String refName) { + if (rsLoaders == null) { + rsLoaders = new ArrayList<>(); + rsLoaderRefs = new ArrayList<>(); + } + rsLoaders.add(loader); + rsLoaderRefs.add(refName); + } + + /** + * Re-starts each result set loader batch when the previous batch has already + * been harvested. The loaders start their first batch from the generated UDF + * setup; this drives every subsequent output batch. + */ + private void startLoaderBatchIfNeeded() { + if (loadersNeedStart && !CollectionUtils.isEmpty(rsLoaders)) { + for (ResultSetLoader loader : rsLoaders) { + loader.startBatch(); + } + loadersNeedStart = false; + } + } + + /** + * Harvests each complex-writer function's result set loader and moves its + * single (wrapped) output column into the container slot reserved for that + * function (see {@link ProjectBatchBuilder#addComplexField}). + */ + private void harvestLoaders() { + for (int i = 0; i < rsLoaders.size(); i++) { + String refName = rsLoaderRefs.get(i); + VectorContainer harvested = rsLoaders.get(i).harvest(); + List columns = new ArrayList<>(); + for (VectorWrapper w : harvested) { + columns.add(w.getValueVector()); + } + + if (columns.size() == 1 + && JsonConverterUtils.WRAP_FIELD.equals(columns.get(0).getField().getName())) { + // convert_fromJSON wraps its value in a single marker column. Unwrap it + // so the output preserves the value's own type (scalar, array or map). + ValueVector src = columns.get(0); + ValueVector dst = container.addOrGet( + MaterializedField.create(refName, src.getField().getType()), callBack); + src.makeTransferPair(dst).transfer(); + } else { + // The loader produced the JSON object's fields directly (e.g. the HTTP + // UDFs, or an empty/all-null document). Wrap them in a map named for the + // output column. + MapVector map = container.addOrGet(refName, + Types.required(TypeProtos.MinorType.MAP), MapVector.class); + map.setMapValueCount(recordCount); + for (ValueVector src : columns) { + map.putChild(src.getField().getName(), src); + } + } + } + loadersNeedStart = true; + container.buildSchema(SelectionVectorMode.NONE); } private void doAlloc(int recordCount) { @@ -274,8 +338,10 @@ private void setValueCount(int count) { for (ComplexWriter writer : complexWriters) { writer.setValueCount(count); } - } else if (rsLoader != null) { - rsLoader.setTargetRowCount(count); + } else if (!CollectionUtils.isEmpty(rsLoaders)) { + for (ResultSetLoader loader : rsLoaders) { + loader.setTargetRowCount(count); + } } } @@ -359,7 +425,7 @@ protected IterOutcome handleNullInput() { protected IterOutcome getFinalOutcome(boolean hasMoreRecordInBoundary) { // In a case of complex writers vectors are added at runtime, so the schema // may change (e.g. when a batch contains new column(s) not present in previous batches) - if (!CollectionUtils.isEmpty(complexWriters) || rsLoader != null) { + if (!CollectionUtils.isEmpty(complexWriters) || !CollectionUtils.isEmpty(rsLoaders)) { return IterOutcome.OK_NEW_SCHEMA; } return super.getFinalOutcome(hasMoreRecordInBoundary); @@ -375,9 +441,16 @@ private void setupNewSchema(RecordBatch incomingBatch, int configuredBatchSize) } allocationVectors = new ArrayList<>(); - if (rsLoader != null) { + if (!CollectionUtils.isEmpty(rsLoaders)) { container.clear(); - rsLoader.close(); + for (ResultSetLoader loader : rsLoaders) { + loader.close(); + } + // The generated projector setup repopulates these (one loader per + // complex-writer function) on the rebuild that follows. + rsLoaders = null; + rsLoaderRefs = null; + loadersNeedStart = false; } else if (!CollectionUtils.isEmpty(complexWriters)) { container.clear(); } else {