Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

Expand Down Expand Up @@ -110,8 +111,13 @@ private WindowTableFunctionOperatorBase createAlignedWindowTableFunctionOperator
TimeWindowUtil.getShiftTimeZone(
windowingStrategy.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
final int timestampPrecision =
LogicalTypeChecks.getPrecision(windowingStrategy.getTimeAttributeType());
return new AlignedWindowTableFunctionOperator(
windowAssigner, windowingStrategy.getTimeAttributeIndex(), shiftTimeZone);
windowAssigner,
windowingStrategy.getTimeAttributeIndex(),
timestampPrecision,
shiftTimeZone);
}

protected abstract Transformation<RowData> translateWithUnalignedWindow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -152,12 +153,15 @@ private WindowTableFunctionOperatorBase createUnalignedWindowTableFunctionOperat
TimeWindowUtil.getShiftTimeZone(
windowingStrategy.getTimeAttributeType(),
TableConfigUtils.getLocalTimeZone(config));
final int timestampPrecision =
LogicalTypeChecks.getPrecision(windowingStrategy.getTimeAttributeType());

return new UnalignedWindowTableFunctionOperator(
windowAssigner,
windowAssigner.getWindowSerializer(new ExecutionConfig()),
new RowDataSerializer(inputRowType),
windowingStrategy.getTimeAttributeIndex(),
timestampPrecision,
shiftTimeZone);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public List<TableTestProgram> programs() {
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF,
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_AGG,
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_POSITIVE_OFFSET,
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_NEGATIVE_OFFSET);
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_NEGATIVE_OFFSET,
WindowTableFunctionTestPrograms.WINDOW_TABLE_FUNCTION_TUMBLE_TVF_UNION_ALL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,49 @@ public class WindowTableFunctionTestPrograms {
+ " %s\n"
+ " GROUP BY window_start, window_end";

static final String QUERY_TVF_UNION_ALL_VALUES =
"INSERT INTO sink_t SELECT\n"
+ " * FROM (\n"
+ " WITH values_table AS (\n"
+ " SELECT cast('2024-01-01 10:00:00' AS TIMESTAMP_LTZ) AS event_time\n"
+ " UNION ALL\n"
+ " SELECT cast('2024-01-01 10:05:00' AS TIMESTAMP_LTZ) AS event_time\n"
+ " UNION ALL\n"
+ " SELECT cast('2024-01-01 10:10:00' AS TIMESTAMP_LTZ) AS event_time\n"
+ " ) SELECT\n"
+ " window_start,\n"
+ " window_end\n"
+ " FROM TABLE(\n"
+ " HOP(\n"
+ " TABLE values_table,\n"
+ " DESCRIPTOR(event_time),\n"
+ " INTERVAL '1' MINUTES,\n"
+ " INTERVAL '2' MINUTES)\n"
+ " ) GROUP BY\n"
+ " window_start,\n"
+ " window_end\n"
+ ")";

public static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF_UNION_ALL =
TableTestProgram.of(
"window-table-function-tumble-tvf-union-all",
"validates window with BinaryRowData non-compact timestamp precision")
.setupTableSink(
SinkTestStep.newBuilder("sink_t")
.addSchema(
"window_start TIMESTAMP(3)", "window_end TIMESTAMP(3)")
.consumedBeforeRestore(
"+I[2024-01-01T09:59, 2024-01-01T10:01]",
"+I[2024-01-01T10:00, 2024-01-01T10:02]",
"+I[2024-01-01T10:04, 2024-01-01T10:06]",
"+I[2024-01-01T10:05, 2024-01-01T10:07]",
"+I[2024-01-01T10:09, 2024-01-01T10:11]",
"+I[2024-01-01T10:10, 2024-01-01T10:12]")
.build())
.setupConfig(TableConfigOptions.LOCAL_TIME_ZONE, "UTC")
.runSql(QUERY_TVF_UNION_ALL_VALUES)
.build();

public static final TableTestProgram WINDOW_TABLE_FUNCTION_TUMBLE_TVF =
TableTestProgram.of(
"window-table-function-tumble-tvf",
Expand Down
Loading