Data Engineer Hub

SQL and Query Engines for Data Engineers (2026)

In short

SQL and query engines for data engineers in 2026 means writing SQL beyond basic SELECT/JOIN — window functions, recursive CTEs, lateral joins, and MERGE — and understanding how the engine actually runs that SQL. The four things to know deeply: First, advanced SQL idioms that show up in every senior+ data pipeline (LAG/LEAD/ROW_NUMBER for sessionization, recursive CTEs for graph traversal, lateral joins for per-row subqueries, MERGE for upserts into Iceberg/Delta). Second, query-engine internals: how Spark's Catalyst optimizer, Trino's coordinator-worker model, Databricks Photon, and Snowflake's vectorized engine plan and execute. Third, the optimizations that decide whether a query takes 30 seconds or 30 minutes: partition pruning, predicate pushdown, and column pruning. Fourth, broadcast vs shuffle joins — when each wins and how to force the right one with hints.

Key takeaways

  • Window functions are the senior+ SQL primitive. LAG/LEAD for time-series diffs, ROW_NUMBER for deduplication, NTILE for percentile bucketing, and SUM/AVG OVER for running aggregates. Every senior data engineer writes these weekly. The Spark SQL reference (spark.apache.org/docs/latest/sql-ref.html) and Trino docs (trino.io/docs/current) document the supported syntax and edge cases around frame specification.
  • Recursive CTEs handle graph and hierarchy traversal — org charts, category trees, multi-hop lineage. Lateral joins (LATERAL in Postgres/Trino, also CROSS APPLY in SQL Server) let you reference outer columns in a subquery, enabling per-row top-N queries that would otherwise require window functions plus a filter.
  • MERGE (a.k.a. UPSERT) is the canonical write operation against Iceberg, Delta Lake, and Hudi tables. It expresses INSERT, UPDATE, and DELETE in a single atomic statement matched on a join condition. Modern lakehouse engines (Databricks, Snowflake, Trino-Iceberg) all support MERGE natively as of 2026.
  • Partition pruning is the single highest-leverage performance optimization in lakehouse queries. The engine reads only the partitions that match the WHERE clause; everything else is skipped without ever opening the file. Verify pruning fired in the query plan — a scan that reads 4,000 partitions when the filter should select 7 means the predicate didn't reach the metadata layer.
  • Predicate pushdown moves WHERE filters as close to the storage layer as possible — into the Parquet row-group footer, into the Iceberg manifest, into the connector. Column pruning does the same for projection. Together they decide whether the engine reads gigabytes or terabytes from object storage. Spark Catalyst, Trino's planner, and Snowflake's optimizer all do this automatically when the SQL is written cleanly.
  • Broadcast joins win when one side is small enough to fit in executor memory (Spark default threshold: 10 MB; tunable to ~1 GB). The small side is shipped to every worker and held in a hash table; the large side streams through. Shuffle joins partition both sides by the join key across the cluster; cheaper when both sides are large but expensive on network.
  • Reading EXPLAIN plans across engines is a senior+ skill. Spark's `EXPLAIN FORMATTED` shows the physical plan with WholeStageCodegen blocks; Trino's `EXPLAIN (TYPE DISTRIBUTED)` shows fragments and exchange operators; Snowflake's query profile shows the operator tree with bytes-scanned and partition-pruning stats. The vocabulary differs but the mental model is the same.

Window functions every senior DE writes weekly

Window functions are the SQL primitive that separates intermediate from senior data engineers. Where GROUP BY collapses rows into one per group, window functions compute aggregates while preserving every input row — enabling sessionization, running totals, deduplication, and rank-based filtering in pure SQL without subqueries or self-joins.

Four patterns show up in every senior+ data pipeline:

  • LAG / LEAD for time-series differences and gap detection — comparing each row to the previous or next row partitioned by some key (user, device, account).
  • ROW_NUMBER with PARTITION BY for deduplication — keep the first or latest row per natural key when source data has duplicates.
  • NTILE for percentile bucketing — divide an ordered set into N equal buckets for cohort analysis or quantile reporting.
  • SUM / AVG / COUNT OVER for running aggregates — cumulative revenue, 7-day rolling average, expanding window counts.

A canonical example combining three of the four:

-- For each user session, compute (1) a session number,
-- (2) the gap in seconds since the previous event,
-- (3) a quartile bucket by session length.
WITH events AS (
  SELECT user_id, event_ts, page,
         ROW_NUMBER() OVER (
           PARTITION BY user_id ORDER BY event_ts
         ) AS event_seq,
         LAG(event_ts) OVER (
           PARTITION BY user_id ORDER BY event_ts
         ) AS prev_ts
  FROM raw.page_views
  WHERE event_date = DATE '2026-04-29'
),
sessionized AS (
  SELECT *,
         CASE
           WHEN prev_ts IS NULL
             OR event_ts - prev_ts > INTERVAL '30' MINUTE
           THEN 1 ELSE 0 END AS is_session_start,
         SUM(CASE
               WHEN prev_ts IS NULL
                 OR event_ts - prev_ts > INTERVAL '30' MINUTE
               THEN 1 ELSE 0 END
         ) OVER (PARTITION BY user_id ORDER BY event_ts
                 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
         ) AS session_id
  FROM events
)
SELECT user_id, session_id,
       MIN(event_ts) AS session_start,
       MAX(event_ts) AS session_end,
       NTILE(4) OVER (
         ORDER BY MAX(event_ts) - MIN(event_ts)
       ) AS length_quartile
FROM sessionized
GROUP BY user_id, session_id;

What this query gets right: the 30-minute inactivity boundary is declarative; the running-sum window assigns a stable session_id per user; NTILE classifies sessions into quartiles in one pass. The same logic in Python or RDD code would require multiple stages and explicit state.

Frame specification matters. The default frame for an ORDER BY window is RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW — running aggregate semantics. For row-positional windows (a 7-row trailing average regardless of timestamp), use ROWS BETWEEN 6 PRECEDING AND CURRENT ROW. Mixing the two silently produces wrong answers; the Spark SQL reference (spark.apache.org/docs/latest/sql-ref.html) and Trino docs (trino.io/docs/current) document the edge cases.

Recursive CTEs and lateral joins round out the senior+ toolkit. Recursive CTEs handle graph traversal — manager hierarchies, category trees, multi-hop lineage. Lateral joins (LATERAL in Postgres and Trino) let a subquery reference outer columns, enabling per-row top-N queries. MERGE expresses upserts atomically against Iceberg, Delta, and Hudi: matched rows update, unmatched source rows insert, optionally unmatched target rows delete. As of 2026 every major lakehouse engine supports MERGE natively, and it is the correct write primitive for SCD and CDC pipelines.

How Spark and Trino plan a query — partition pruning + predicate pushdown

Modern query engines share the same high-level plan: parse SQL into a logical plan, optimize with rule-based and cost-based transformations, translate to a physical plan, execute. The names differ — Spark calls this Catalyst, Trino calls it the planner, Snowflake hides it in the cloud service layer — but the optimizations that matter are the same.

Partition pruning uses the WHERE clause to eliminate entire partitions before any data is read. If a table is partitioned by event_date and the query filters that column, the engine consults the metadata catalog (Hive Metastore, Iceberg manifests, Delta transaction log) and reads only matching partitions. A 2,000-partition table becomes a 1-partition scan. Iceberg's hidden partitioning (e.g., days(event_ts) as the partition transform) lets the engine prune even when the WHERE clause is on the timestamp directly.

Predicate pushdown moves filters as far down the plan as possible. A WHERE clause on a Parquet column is pushed to the Parquet reader, which uses row-group min/max statistics in the footer to skip row groups that can't match. Iceberg additionally pushes predicates into the manifest layer, eliminating files before they're opened. Column pruning is the projection equivalent: a 200-column Parquet table read for 5 columns reads ~2.5% of the storage bytes.

A worked example with a Spark plan:

-- Iceberg fact table partitioned by days(event_ts), bucketed by user_id
EXPLAIN FORMATTED
SELECT user_id, event_type, COUNT(*) AS n
FROM analytics.events
WHERE event_ts >= TIMESTAMP '2026-04-25 00:00:00'
  AND event_ts <  TIMESTAMP '2026-04-30 00:00:00'
  AND event_type IN ('click', 'purchase')
GROUP BY user_id, event_type;

-- == Physical Plan ==
-- AdaptiveSparkPlan isFinalPlan=false
-- +- HashAggregate(keys=[user_id, event_type], functions=[count(1)])
--    +- Exchange hashpartitioning(user_id, event_type, 200)
--       +- HashAggregate(keys=[user_id, event_type], functions=[partial_count(1)])
--          +- Project [user_id, event_type]
--             +- Filter (event_type IN ('click','purchase'))
--                +- BatchScan analytics.events[user_id, event_type, event_ts]
--                   PartitionFilters: [days(event_ts) IN (19838, 19839, 19840, 19841, 19842)]
--                   DataFilters: [event_type IN ('click','purchase')]
--                   PushedFilters: [In(event_type, [click, purchase])]
--                   ReadSchema: struct<user_id:bigint,event_type:string,event_ts:timestamp>

What this plan reveals: PartitionFilters shows the engine pruned to 5 daily partitions out of however many exist (partition pruning fired); PushedFilters shows the event_type filter was pushed to the Parquet reader (predicate pushdown fired); ReadSchema shows only three columns are read out of however many the table has (column pruning fired); the partial_count + Exchange + count shape is a two-stage aggregation — partial counts computed locally, shuffled by the GROUP BY key, finalized.

If PartitionFilters were empty, pruning failed — usually because the predicate references a non-partition column or uses a function the optimizer can't push through (Iceberg's hidden partitioning solves the function-pushdown problem in most cases). The Databricks engineering blog (databricks.com/blog/category/engineering) has detailed posts on Catalyst optimizer rules; Trino docs (trino.io/docs/current) cover connector-level pushdown.

Broadcast joins vs shuffle joins — when each wins

Joins are the operation where data engineers feel the cluster. A poorly chosen join strategy turns a 30-second query into a 30-minute query and a 30-minute query into an out-of-memory failure. Two strategies dominate, and a senior data engineer can name which one fired and force the other when the optimizer guesses wrong.

Broadcast hash join ships one side of the join to every worker, where it is held in a hash table; the other side streams through and probes the hash table per row. There is no shuffle of the streamed side. The trade-off: the broadcast side must fit in executor memory. Spark's default broadcast threshold is spark.sql.autoBroadcastJoinThreshold = 10MB; Databricks Runtime defaults higher, and the threshold can be tuned up to ~1 GB on well-provisioned executors. Broadcast joins are O(N) in the streamed side and effectively free in network terms.

Shuffle hash join and sort-merge join partition both sides by the join key across the cluster. Each worker then joins matching partitions locally. The cost is a full shuffle of both inputs — every row crosses the network at least once, sorted or hashed by the join key. Sort-merge join (Spark's default for large joins) sorts each partition and merges in order; shuffle hash join builds an in-memory hash table per partition. Both are O(N+M) but with substantial network and disk I/O.

The decision rule:

  • One side is small (dimension table, lookup, recent slice): broadcast. A 50 MB dimension table joined to a 5 TB fact table is a textbook broadcast.
  • Both sides are large: shuffle (sort-merge in Spark, distributed hash join in Trino). The optimizer should pick this automatically when statistics are accurate.
  • Highly skewed join key: Adaptive Query Execution (Spark 3+) detects skew and splits hot partitions; in Trino, dynamic filtering helps. Salting (appending a random suffix to the hot key) is the manual fallback.

When the optimizer picks wrong — usually because table statistics are stale or missing — force the strategy with a hint:

-- Spark: force a broadcast of the dimension side
SELECT /*+ BROADCAST(d) */ f.user_id, f.event_type, d.country
FROM analytics.events f
JOIN reference.user_dim d ON f.user_id = d.user_id
WHERE f.event_date = DATE '2026-04-29';

-- Spark: force a sort-merge join (disable broadcast)
SELECT /*+ MERGEJOIN(f, h) */ f.order_id, h.shipped_at
FROM analytics.fact_orders f
JOIN analytics.fact_shipments h
  ON f.order_id = h.order_id;

-- Trino: distribution_type session property + join hint
-- Set per-session: SET SESSION join_distribution_type = 'BROADCAST';
-- Or per-query in newer Trino versions:
SELECT f.user_id, d.country
FROM analytics.events f
JOIN reference.user_dim d ON f.user_id = d.user_id;
-- Trino chooses BROADCAST automatically when the build side is
-- estimated to fit; force with session property if statistics are wrong.

After applying a hint, re-run EXPLAIN and confirm the plan shows BroadcastHashJoin (Spark) or BROADCAST exchange (Trino) instead of SortMergeJoin / PARTITIONED exchange. Hints are advisory; if the build side genuinely doesn't fit, Spark falls back to a shuffle join with a warning. Update statistics (ANALYZE TABLE ... COMPUTE STATISTICS in Spark, automatic in Iceberg/Snowflake) so the optimizer makes the right call without hints. Joe Reis and Matt Housley's Fundamentals of Data Engineering (oreilly.com/library/view/fundamentals-of-data/9781098108298) frames the trade-offs in production terms.

Reading EXPLAIN plans: Spark vs Trino vs Snowflake

Reading query plans is the diagnostic core of senior data-engineering work. Vocabulary differs across engines, but the mental model is the same: identify the operators that dominate runtime, check whether pruning and pushdown fired, verify the join strategy, look for spilling or skew.

Spark. EXPLAIN FORMATTED returns the physical plan with operator nodes and (when present) PartitionFilters, PushedFilters, ReadSchema. EXPLAIN COST adds row-count estimates. The Spark UI shows the same information with stage-by-stage timing and shuffle bytes. Look for: BroadcastHashJoin vs SortMergeJoin, WholeStageCodegen blocks (operators fused into one JVM method), Exchange nodes (each is a shuffle), and BatchScan with empty PartitionFilters (full table scan).

Trino. EXPLAIN (TYPE DISTRIBUTED) shows the plan as fragments — units of work distributed across workers — with exchange operators between them tagged BROADCAST, PARTITIONED, or REPLICATE. EXPLAIN ANALYZE runs the query and annotates each operator with actual row counts, CPU time, and memory. Look for connector-level pushdown in the scan operator and dynamic filter operators on the probe side of joins.

Snowflake hides the planner. EXPLAIN returns text; the canonical tool is the Query Profile UI, which shows the operator tree with bytes scanned, partitions scanned, partitions pruned, and percentage time per operator. The pruning ratio (Partitions scanned / Partitions total) is the key number — 12 of 4,800 means good pruning, 4,800 of 4,800 means a full table scan. Most Snowflake optimization comes from writing clean SQL with selective WHERE clauses on clustering keys, not tuning execution.

Databricks Photon is a vectorized C++ engine that replaces JVM whole-stage codegen for supported operators (scan, filter, project, aggregate, hash join, certain window functions). Look for operator names prefixed with Photon in the plan. UDFs and unsupported functions fall back to the JVM. The Databricks engineering blog has detailed posts on Photon eligibility in 2026.

The cross-engine cheatsheet:

Concept                | Spark                    | Trino                       | Snowflake
-----------------------|--------------------------|-----------------------------|------------------------
Plan command           | EXPLAIN FORMATTED        | EXPLAIN (TYPE DISTRIBUTED)  | EXPLAIN / Query Profile
With runtime stats     | Spark UI / EXPLAIN COST  | EXPLAIN ANALYZE             | Query Profile UI
Broadcast join         | BroadcastHashJoin        | BROADCAST exchange          | (chosen automatically)
Shuffle join           | SortMergeJoin / Exchange | PARTITIONED exchange        | (chosen automatically)
Partition pruning      | PartitionFilters         | Connector pushdown          | Partitions scanned/total
Predicate pushdown     | PushedFilters            | Pushdown in scan operator   | Filter % in profile
Vectorized execution   | WholeStageCodegen/Photon | (built-in)                  | (built-in)

The senior+ bar in 2026: you can run EXPLAIN against any of the three engines, identify the join strategy and whether pruning/pushdown fired, and explain why a query is slow with a specific operator and statistic — not "it's probably the join". Markus Winand's use-the-index-luke.com (use-the-index-luke.com) is the canonical SQL query-plan reference; the Snowflake SQL reference (docs.snowflake.com/en/sql-reference) covers engine-specific syntax.

Frequently asked questions

When should I use a window function vs a GROUP BY?
Use GROUP BY when you want one row per group. Use a window function when you want every input row preserved with an aggregate or rank attached. Sessionization, deduplication (ROW_NUMBER), running totals, percentile bucketing (NTILE), and lag-based diffs all require window functions because GROUP BY collapses rows. The Spark SQL reference and Trino docs document the supported window functions and frame specifications.
What's the difference between a CTE and a subquery in Spark/Trino?
Semantically they're equivalent — modern engines (Spark 3+, Trino, Snowflake) inline non-recursive CTEs and produce the same physical plan as the equivalent subquery. The difference is readability: CTEs let you name and reuse a query block, which matters for maintainability. Recursive CTEs are a separate construct (WITH RECURSIVE) for graph traversal and have no subquery equivalent.
How do I know if partition pruning fired in my Spark query?
Run EXPLAIN FORMATTED and look at the BatchScan or FileScan operator for the table. The PartitionFilters field shows which predicates were pushed to the partition layer; if it's empty, no pruning happened. Common causes: the WHERE clause references a non-partition column, the predicate uses a function the optimizer can't push (e.g., DATE(event_ts) on a Hive-partitioned column), or partition statistics are stale. Iceberg's hidden partitioning solves the function-pushdown problem in most cases.
When should I broadcast a join vs let Spark shuffle?
Broadcast when one side is small enough to fit in executor memory — typically under 100 MB for safety, up to ~1 GB on well-provisioned clusters. The broadcast side is shipped to every worker and held in a hash table; the large side streams through with no shuffle. Shuffle (sort-merge join) when both sides are large; the cost is a full shuffle but it scales. Force broadcast with /*+ BROADCAST(t) */ in Spark or distribution_type='BROADCAST' in Trino when the optimizer guesses wrong.
What is predicate pushdown and why does it matter for Parquet/Iceberg?
Predicate pushdown moves WHERE filters into the storage layer so the engine reads less data. For Parquet, the filter is evaluated against row-group min/max statistics in the footer — entire row groups are skipped without decompression. For Iceberg, the filter is evaluated against manifest-file statistics, eliminating whole files before they're opened. The result: a query with a selective WHERE clause reads megabytes instead of gigabytes from object storage. Verify pushdown fired by checking PushedFilters in the Spark plan or the connector pushdown line in Trino's EXPLAIN.
What does Photon do that vanilla Spark doesn't?
Photon is a vectorized C++ execution engine that replaces Spark's JVM whole-stage codegen for supported operators (scan, filter, project, aggregate, hash join, sort, certain window functions). It processes batches of rows in CPU-cache-friendly columnar format, leveraging SIMD instructions. The result is typically 2-5x speedup on scan-heavy and aggregation-heavy workloads. UDFs and unsupported functions cause fall-back to the JVM; look for Photon-prefixed operator names in the Spark plan to verify it engaged.
How does Trino's coordinator-worker model differ from Spark's driver-executor model?
Conceptually similar — a coordinator (driver) plans the query and assigns work to workers (executors). The difference is execution model: Trino is an MPP query engine designed for interactive SQL — workers exchange data via streaming pipelines and the entire query is one continuous execution. Spark is a general-purpose engine that materializes intermediate results between stages, supporting both batch ETL and ad-hoc SQL. Trino wins on interactive latency for queries that fit in memory; Spark wins on long batch jobs that need spill-to-disk and complex DAGs.
How do I handle skewed joins in Spark?
Adaptive Query Execution (Spark 3+) detects skew at runtime and splits hot partitions into sub-partitions, then duplicates the matching rows on the build side — enable with spark.sql.adaptive.skewJoin.enabled=true. For severe skew, salt the hot key manually: append a random suffix (0-N) to the hot key on the large side, replicate the matching row N times on the small side with each suffix. Salting is the manual fallback when AQE doesn't fully resolve the skew.
What's the right way to upsert into an Iceberg or Delta table?
Use MERGE. It expresses INSERT, UPDATE, and DELETE atomically against a target table given a source dataset and a join condition. MERGE is the canonical write primitive for slowly-changing-dimension and CDC pipelines as of 2026 — supported natively in Spark SQL, Trino-Iceberg, Databricks, and Snowflake. The alternative (DELETE + INSERT in two transactions) is not atomic and can produce inconsistent reads if a concurrent reader sees the gap.

Sources

  1. Apache Spark SQL Reference. Canonical documentation for Spark SQL syntax including window functions, recursive CTEs, MERGE, and EXPLAIN output formats.
  2. Trino Documentation. Coverage of the coordinator-worker architecture, connector-level predicate pushdown, dynamic filtering, and EXPLAIN (TYPE DISTRIBUTED) plan output.
  3. Snowflake SQL Reference. Canonical reference for Snowflake-specific SQL including QUALIFY, MERGE, micro-partition pruning, and the Query Profile UI.
  4. Databricks Engineering Blog. Deep technical posts on the Catalyst optimizer, Adaptive Query Execution, Photon vectorized engine, and Delta Lake internals.
  5. Use the Index, Luke! (Markus Winand). The canonical guide to SQL indexing and query plans across Postgres, MySQL, Oracle, and SQL Server — foundational for understanding any RDBMS query plan.
  6. Fundamentals of Data Engineering (Joe Reis & Matt Housley, O'Reilly). Frames query-engine choices and SQL architectural decisions in production terms across the data engineering lifecycle.

About the author. Blake Crosley founded ResumeGeni and writes about data engineering, hiring technology, and ATS optimization. More writing at blakecrosley.com.