diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java index 4f6491c0920..8b468019bae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java @@ -87,7 +87,7 @@ protected HoldingContainer generateEvalBody(ClassGenerator classGenerator, Ho classGenerator.getEvalBlock().add(complexWriter.invoke("setPosition").arg(classGenerator.getMappingSet().getValueWriteIndex())); sub.decl(classGenerator.getModel()._ref(ComplexWriter.class), getReturnValue().getName(), complexWriter); } else { - classGenerator.getSetupBlock().add(projBatch.invoke("addLoader").arg(rsLoader)); + classGenerator.getSetupBlock().add(projBatch.invoke("addLoader").arg(rsLoader).arg(JExpr.lit(refName))); sub.decl(classGenerator.getModel()._ref(ResultSetLoader.class), getReturnValue().getName(), rsLoader); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java index 61e64a55dcc..a64ce5b2f18 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.expr.fn.impl.conv; -import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; @@ -30,11 +29,13 @@ import org.apache.drill.exec.expr.holders.NullableVarCharHolder; import org.apache.drill.exec.expr.holders.VarBinaryHolder; import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import javax.inject.Inject; +@SuppressWarnings("unused") public class JsonConvertFrom { private JsonConvertFrom() { @@ -46,39 +47,31 @@ public static class ConvertFromJson implements DrillSimpleFunc { @Param VarBinaryHolder in; - @Inject - DrillBuf buffer; + @Output + ComplexWriter writer; @Inject OptionManager options; + @Inject + ResultSetLoader rsLoader; + @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output - ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE); - boolean readNumbersAsDouble = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE); - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, 1, in.start, in.end, in.buffer, false, false, false); } } @@ -94,79 +87,66 @@ public static class ConvertFromJsonWithArgs implements DrillSimpleFunc { @Param BitHolder readNumbersAsDoubleHolder; + @Output + ComplexWriter writer; + + @Inject + OptionManager options; + @Inject - DrillBuf buffer; + ResultSetLoader rsLoader; @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output - ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = allTextModeHolder.value == 1; - boolean readNumbersAsDouble = readNumbersAsDoubleHolder.value == 1; - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, 1, in.start, in.end, in.buffer, true, + allTextModeHolder.value == 1, readNumbersAsDoubleHolder.value == 1); } } - @FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, isRandom = true) public static class ConvertFromJsonVarchar implements DrillSimpleFunc { @Param VarCharHolder in; - @Inject - DrillBuf buffer; + @Output + ComplexWriter writer; @Inject OptionManager options; + @Inject + ResultSetLoader rsLoader; + @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output - ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE); - boolean readNumbersAsDouble = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE); - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, 1, in.start, in.end, in.buffer, false, false, false); } } @@ -182,36 +162,32 @@ public static class ConvertFromJsonVarcharWithConfig implements DrillSimpleFunc @Param BitHolder readNumbersAsDoubleHolder; + @Output + ComplexWriter writer; + @Inject - DrillBuf buffer; + OptionManager options; + + @Inject + ResultSetLoader rsLoader; @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output - ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = allTextModeHolder.value == 1; - boolean readNumbersAsDouble = readNumbersAsDoubleHolder.value == 1; - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, 1, in.start, in.end, in.buffer, true, + allTextModeHolder.value == 1, readNumbersAsDoubleHolder.value == 1); } } @@ -221,47 +197,31 @@ public static class ConvertFromJsonNullableInput implements DrillSimpleFunc { @Param NullableVarBinaryHolder in; - @Inject - DrillBuf buffer; + @Output + ComplexWriter writer; @Inject OptionManager options; + @Inject + ResultSetLoader rsLoader; + @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output - ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE); - boolean readNumbersAsDouble = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE); - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - if (in.isSet == 0) { - // Return empty map - org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap(); - mapWriter.start(); - mapWriter.end(); - return; - } - - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, in.isSet, in.start, in.end, in.buffer, false, false, false); } } @@ -277,44 +237,32 @@ public static class ConvertFromJsonNullableInputWithArgs implements DrillSimpleF @Param BitHolder readNumbersAsDoubleHolder; + @Output + ComplexWriter writer; + + @Inject + OptionManager options; + @Inject - DrillBuf buffer; + ResultSetLoader rsLoader; @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output - ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = allTextModeHolder.value == 1; - boolean readNumbersAsDouble = readNumbersAsDoubleHolder.value == 1; - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - if (in.isSet == 0) { - // Return empty map - org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap(); - mapWriter.start(); - mapWriter.end(); - return; - } - - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, in.isSet, in.start, in.end, in.buffer, true, + allTextModeHolder.value == 1, readNumbersAsDoubleHolder.value == 1); } } @@ -324,46 +272,31 @@ public static class ConvertFromJsonVarcharNullableInput implements DrillSimpleFu @Param NullableVarCharHolder in; - @Inject - DrillBuf buffer; + @Output + ComplexWriter writer; @Inject OptionManager options; + @Inject + ResultSetLoader rsLoader; + @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE); - boolean readNumbersAsDouble = options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE); - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - if (in.isSet == 0) { - // Return empty map - org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap(); - mapWriter.start(); - mapWriter.end(); - return; - } - - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, in.isSet, in.start, in.end, in.buffer, false, false, false); } } @@ -379,43 +312,32 @@ public static class ConvertFromJsonVarcharNullableInputWithConfigs implements Dr @Param BitHolder readNumbersAsDoubleHolder; + @Output + ComplexWriter writer; + @Inject - DrillBuf buffer; + OptionManager options; + + @Inject + ResultSetLoader rsLoader; @Workspace - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader; + org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator streamIter; - @Output ComplexWriter writer; + @Workspace + org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl jsonLoader; @Override public void setup() { - boolean allTextMode = allTextModeHolder.value == 1; - boolean readNumbersAsDouble = readNumbersAsDoubleHolder.value == 1; - - jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer) - .defaultSchemaPathColumns() - .allTextMode(allTextMode) - .readNumbersAsDouble(readNumbersAsDouble) - .build(); + streamIter = new org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator(); + rsLoader.startBatch(); } @Override public void eval() { - if (in.isSet == 0) { - // Return empty map - org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap(); - mapWriter.start(); - mapWriter.end(); - return; - } - - try { - jsonReader.setSource(in.start, in.end, in.buffer); - jsonReader.write(writer); - buffer = jsonReader.getWorkBuf(); - } catch (Exception e) { - throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); - } + jsonLoader = org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils.convertJson( + rsLoader, jsonLoader, options, streamIter, in.isSet, in.start, in.end, in.buffer, true, + allTextModeHolder.value == 1, readNumbersAsDoubleHolder.value == 1); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java new file mode 100644 index 00000000000..d6ccaaf4d02 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConverterUtils.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.expr.fn.impl.conv; + + +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.resultSet.ResultSetLoader; +import org.apache.drill.exec.physical.resultSet.RowSetLoader; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder; +import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions; +import org.apache.drill.exec.vector.complex.fn.DrillBufInputStream; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.io.SequenceInputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JsonConverterUtils { + + private static final Logger logger = LoggerFactory.getLogger(JsonConverterUtils.class); + + /** + * Field name used to wrap the input value so that the record-oriented JSON + * loader can read top-level scalars and arrays (not just objects) uniformly. + * The single resulting column carries the converted value and is recognised + * and unwrapped by {@code ProjectRecordBatch} (which transfers it directly to + * the output column instead of wrapping the loader's columns in a map). + */ + public static final String WRAP_FIELD = "drill_json_value_wrapper"; + + private static final byte[] WRAP_PREFIX = + ("{\"" + WRAP_FIELD + "\":").getBytes(StandardCharsets.UTF_8); + private static final byte[] WRAP_SUFFIX = "}".getBytes(StandardCharsets.UTF_8); + + private JsonConverterUtils() { + } + + /** + * Creates a {@link JsonLoaderImpl} for use in JSON conversion UDFs, using the + * system JSON options from the {@link OptionManager}. + * + * @param rsLoader The {@link ResultSetLoader} used in the UDF + * @param options The {@link OptionManager} used in the UDF. This is used to extract the global JSON options + * @param stream An input stream containing the input JSON data + * @return A {@link JsonLoaderImpl} for use in the UDF. + */ + public static JsonLoaderImpl createJsonLoader(ResultSetLoader rsLoader, + OptionManager options, + ClosingStreamIterator stream) { + JsonLoaderBuilder jsonLoaderBuilder = new JsonLoaderBuilder() + .resultSetLoader(rsLoader) + .standardOptions(options) + .fromStream(() -> stream); + + return (JsonLoaderImpl) jsonLoaderBuilder.build(); + } + + /** + * Creates a {@link JsonLoaderImpl} for use in JSON conversion UDFs, overriding the + * {@code allTextMode} and {@code readNumbersAsDouble} options with the values supplied + * as function arguments. Remaining options are taken from the system JSON options. + * + * @param rsLoader The {@link ResultSetLoader} used in the UDF + * @param options The {@link OptionManager} used in the UDF. This is used to extract the global JSON options + * @param stream An input stream containing the input JSON data + * @param allTextMode Whether to read all scalars as text + * @param readNumbersAsDouble Whether to read all numbers as doubles + * @return A {@link JsonLoaderImpl} for use in the UDF. + */ + public static JsonLoaderImpl createJsonLoader(ResultSetLoader rsLoader, + OptionManager options, + ClosingStreamIterator stream, + boolean allTextMode, + boolean readNumbersAsDouble) { + JsonLoaderOptions jsonOptions = new JsonLoaderOptions(options); + jsonOptions.allTextMode = allTextMode; + jsonOptions.readNumbersAsDouble = readNumbersAsDouble; + + JsonLoaderBuilder jsonLoaderBuilder = new JsonLoaderBuilder() + .resultSetLoader(rsLoader) + .options(jsonOptions) + .fromStream(() -> stream); + + return (JsonLoaderImpl) jsonLoaderBuilder.build(); + } + + /** + * Converts a single JSON value (one row of UDF input) into the result set loader. + * + *

Exactly one row is always written to the loader -- even for null/empty + * input -- so that the loader's row count stays aligned with the surrounding + * project batch. The value is wrapped in a single-field object so that + * top-level scalars and arrays parse the same way as objects.

+ * + * @param rsLoader the result set loader injected into the UDF + * @param jsonLoader the lazily created JSON loader, or {@code null} on the first call + * @param options the system JSON options + * @param stream the streaming iterator bound to the JSON loader + * @param isSet 0 if the input value is null, 1 otherwise + * @param start start offset of the value in {@code buffer} + * @param end end offset of the value in {@code buffer} + * @param buffer buffer holding the input value + * @param useArgs whether to honour the explicit allTextMode/readNumbersAsDouble arguments + * @param allTextMode explicit all-text-mode argument (used only when {@code useArgs}) + * @param readNumbersAsDouble explicit read-numbers-as-double argument (used only when {@code useArgs}) + * @return the JSON loader (created on first use), to be cached by the caller + */ + public static JsonLoaderImpl convertJson(ResultSetLoader rsLoader, + JsonLoaderImpl jsonLoader, + OptionManager options, + ClosingStreamIterator stream, + int isSet, int start, int end, DrillBuf buffer, + boolean useArgs, + boolean allTextMode, + boolean readNumbersAsDouble) { + RowSetLoader rowWriter = rsLoader.writer(); + rowWriter.start(); + // For null or empty input emit an (unset) row to keep the row count aligned. + if (isSet == 0 || start == end) { + rowWriter.save(); + return jsonLoader; + } + + try { + stream.setValue(getWrappedJsonStream(start, end, buffer)); + if (jsonLoader == null) { + jsonLoader = useArgs + ? createJsonLoader(rsLoader, options, stream, allTextMode, readNumbersAsDouble) + : createJsonLoader(rsLoader, options, stream); + } + // next() reads the single wrapped record; always save the row so the + // count stays aligned even for an empty document. + jsonLoader.parser().next(); + rowWriter.save(); + } catch (Exception e) { + throw UserException.dataReadError(e) + .message("Error while reading JSON. ") + .addContext(e.getMessage()) + .build(logger); + } + return jsonLoader; + } + + /** + * Wraps the raw input value in a single-field JSON object ({@code {"json": }}) + * so that the record-oriented JSON loader accepts top-level scalars and arrays. + */ + private static InputStream getWrappedJsonStream(int start, int end, DrillBuf buffer) { + InputStream value = DrillBufInputStream.getStream(start, end, buffer); + return new SequenceInputStream(Collections.enumeration(Arrays.asList( + new ByteArrayInputStream(WRAP_PREFIX), + value, + new ByteArrayInputStream(WRAP_SUFFIX)))); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java index 056f4c09a75..96bfb378ab4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java @@ -95,9 +95,10 @@ public ValueVectorWriteExpression addOutputVector(String name, LogicalExpression @Override public void addComplexField(FieldReference ref) { - if (projectBatch.rsLoader == null) { - initComplexWriters(); - } + // The result set loaders are not registered until the generated projector + // setup runs (after this point), so the complex-writer list is always + // (re)initialised here; it is simply left unused when loaders are present. + initComplexWriters(); if (projectBatch.complexFieldReferencesList == null) { projectBatch.complexFieldReferencesList = Lists.newArrayList(); } else { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index a23227a275e..bce6f191ba6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -28,12 +28,14 @@ import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.resultSet.ResultSetLoader; import org.apache.drill.exec.record.AbstractSingleRecordBatch; +import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.SimpleRecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.expr.fn.impl.conv.JsonConverterUtils; import org.apache.drill.exec.util.record.RecordBatchStats; import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType; import org.apache.drill.exec.vector.AllocationHelper; @@ -54,7 +56,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch { protected List allocationVectors; @Deprecated // use new writer rsLoader protected List complexWriters; - protected ResultSetLoader rsLoader; + // One result set loader per complex-writer (EVF) function in the project list. + // rsLoaderRefs holds each loader's output column name, captured at codegen, so + // the harvested column lands in the slot reserved for that function. + protected List rsLoaders; + private List rsLoaderRefs; + // True once the loaders have been harvested and must be re-started before the + // next output batch is written. + private boolean loadersNeedStart; protected List complexFieldReferencesList; protected ProjectMemoryManager memoryManager; private Projector projector; @@ -111,7 +120,7 @@ protected IterOutcome doWork() { memoryManager.update(); if (first && incomingRecordCount == 0) { - if (!CollectionUtils.isEmpty(complexWriters) || rsLoader != null ) { + if (!CollectionUtils.isEmpty(complexWriters) || !CollectionUtils.isEmpty(rsLoaders)) { IterOutcome next = null; while (incomingRecordCount == 0) { if (getLastKnownOutcome() == EMIT) { @@ -146,7 +155,7 @@ protected IterOutcome doWork() { } } - if ((!CollectionUtils.isEmpty(complexWriters) || rsLoader != null) && getLastKnownOutcome() == EMIT) { + if ((!CollectionUtils.isEmpty(complexWriters) || !CollectionUtils.isEmpty(rsLoaders)) && getLastKnownOutcome() == EMIT) { throw UserException.unsupportedError() .message("Currently functions producing complex types as output are not " + "supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " + @@ -162,6 +171,7 @@ protected IterOutcome doWork() { memoryManager.getOutputRowCount(), incomingRecordCount, incoming, this); doAlloc(maxOuputRecordCount); + startLoaderBatchIfNeeded(); long projectStartTime = System.currentTimeMillis(); int outputRecords = projector.projectRecords(incoming, 0, maxOuputRecordCount, 0); long projectEndTime = System.currentTimeMillis(); @@ -178,14 +188,8 @@ protected IterOutcome doWork() { } // In case of complex writer expression, vectors would be added to batch run-time. // We have to re-build the schema. - if (rsLoader != null) { - MapVector map = container.addOrGet(container.getLast().getField().getName(), Types.required(TypeProtos.MinorType.MAP), MapVector.class); - map.setMapValueCount(recordCount); - for (VectorWrapper vectorWrapper : rsLoader.harvest()) { - ValueVector valueVector = vectorWrapper.getValueVector(); - map.putChild(valueVector.getField().getName(), valueVector); - } - container.buildSchema(SelectionVectorMode.NONE); + if (!CollectionUtils.isEmpty(rsLoaders)) { + harvestLoaders(); } else if (!CollectionUtils.isEmpty(complexWriters)) { container.buildSchema(SelectionVectorMode.NONE); } @@ -202,6 +206,7 @@ private void handleRemainder() { assert memoryManager.incomingBatch() == incoming; int recordsToProcess = Math.min(remainingRecordCount, memoryManager.getOutputRowCount()); doAlloc(recordsToProcess); + startLoaderBatchIfNeeded(); logger.trace("handleRemainder: remaining RC {}, toProcess {}, remainder index {}, incoming {}, Project {}", remainingRecordCount, recordsToProcess, remainderIndex, incoming, this); @@ -225,7 +230,9 @@ private void handleRemainder() { } // In case of complex writer expression, vectors would be added to batch run-time. // We have to re-build the schema. - if (!CollectionUtils.isEmpty(complexWriters) || rsLoader != null) { + if (!CollectionUtils.isEmpty(rsLoaders)) { + harvestLoaders(); + } else if (!CollectionUtils.isEmpty(complexWriters)) { container.buildSchema(SelectionVectorMode.NONE); } @@ -239,8 +246,65 @@ public void addComplexWriter(ComplexWriter writer) { complexWriters.add(writer); } - public void addLoader(ResultSetLoader loader) { - rsLoader = loader; + public void addLoader(ResultSetLoader loader, String refName) { + if (rsLoaders == null) { + rsLoaders = new ArrayList<>(); + rsLoaderRefs = new ArrayList<>(); + } + rsLoaders.add(loader); + rsLoaderRefs.add(refName); + } + + /** + * Re-starts each result set loader batch when the previous batch has already + * been harvested. The loaders start their first batch from the generated UDF + * setup; this drives every subsequent output batch. + */ + private void startLoaderBatchIfNeeded() { + if (loadersNeedStart && !CollectionUtils.isEmpty(rsLoaders)) { + for (ResultSetLoader loader : rsLoaders) { + loader.startBatch(); + } + loadersNeedStart = false; + } + } + + /** + * Harvests each complex-writer function's result set loader and moves its + * single (wrapped) output column into the container slot reserved for that + * function (see {@link ProjectBatchBuilder#addComplexField}). + */ + private void harvestLoaders() { + for (int i = 0; i < rsLoaders.size(); i++) { + String refName = rsLoaderRefs.get(i); + VectorContainer harvested = rsLoaders.get(i).harvest(); + List columns = new ArrayList<>(); + for (VectorWrapper w : harvested) { + columns.add(w.getValueVector()); + } + + if (columns.size() == 1 + && JsonConverterUtils.WRAP_FIELD.equals(columns.get(0).getField().getName())) { + // convert_fromJSON wraps its value in a single marker column. Unwrap it + // so the output preserves the value's own type (scalar, array or map). + ValueVector src = columns.get(0); + ValueVector dst = container.addOrGet( + MaterializedField.create(refName, src.getField().getType()), callBack); + src.makeTransferPair(dst).transfer(); + } else { + // The loader produced the JSON object's fields directly (e.g. the HTTP + // UDFs, or an empty/all-null document). Wrap them in a map named for the + // output column. + MapVector map = container.addOrGet(refName, + Types.required(TypeProtos.MinorType.MAP), MapVector.class); + map.setMapValueCount(recordCount); + for (ValueVector src : columns) { + map.putChild(src.getField().getName(), src); + } + } + } + loadersNeedStart = true; + container.buildSchema(SelectionVectorMode.NONE); } private void doAlloc(int recordCount) { @@ -274,8 +338,10 @@ private void setValueCount(int count) { for (ComplexWriter writer : complexWriters) { writer.setValueCount(count); } - } else if (rsLoader != null) { - rsLoader.setTargetRowCount(count); + } else if (!CollectionUtils.isEmpty(rsLoaders)) { + for (ResultSetLoader loader : rsLoaders) { + loader.setTargetRowCount(count); + } } } @@ -359,7 +425,7 @@ protected IterOutcome handleNullInput() { protected IterOutcome getFinalOutcome(boolean hasMoreRecordInBoundary) { // In a case of complex writers vectors are added at runtime, so the schema // may change (e.g. when a batch contains new column(s) not present in previous batches) - if (!CollectionUtils.isEmpty(complexWriters) || rsLoader != null) { + if (!CollectionUtils.isEmpty(complexWriters) || !CollectionUtils.isEmpty(rsLoaders)) { return IterOutcome.OK_NEW_SCHEMA; } return super.getFinalOutcome(hasMoreRecordInBoundary); @@ -375,9 +441,16 @@ private void setupNewSchema(RecordBatch incomingBatch, int configuredBatchSize) } allocationVectors = new ArrayList<>(); - if (rsLoader != null) { + if (!CollectionUtils.isEmpty(rsLoaders)) { container.clear(); - rsLoader.close(); + for (ResultSetLoader loader : rsLoaders) { + loader.close(); + } + // The generated projector setup repopulates these (one loader per + // complex-writer function) on the rebuild that follows. + rsLoaders = null; + rsLoaderRefs = null; + loadersNeedStart = false; } else if (!CollectionUtils.isEmpty(complexWriters)) { container.clear(); } else {