How Parallel Plans Start Up—Part 5

Start 5

This is the final part of a five-part series taking a deep dive into the way SQL Server row mode parallel plans start executing. Part 1 initialized execution context zero for the parent task, and part 2 created the query scan tree. Part 3 started the query scan, performed some early phase processing, and started the first additional parallel tasks in branch C. Part 4 described exchange synchronization, and the start up of parallel plan branches C & D.

4. Branch B Parallel Tasks Start

Parallel branches

This is the fourth stage in the execution sequence:

  1. Branch A (parent task).
  2. Branch C (additional parallel tasks).
  3. Branch D (additional parallel tasks).
  4. Branch B (additional parallel tasks).

The only thread active right now (not suspended on CXPACKET) is the parent task, which is at the consumer side of the repartition streams exchange at node 11 in Branch B:

Branch B

The parent task now returns from nested early phases calls, setting elapsed and CPU times in profilers as it goes. First and last active times are not updated during early phase processing. Remember these numbers are being recorded against execution context zero—the Branch B parallel tasks do not exist yet.

The parent task ascends the tree from node 11, through the stream aggregate at node 10 and the merge join at node 3, back to the gather streams exchange at node 2.

Early phase processing is now complete.

With the original EarlyPhases call at the node 2 gather streams exchange finally completed, the parent task returns to opening that exchange (you may just about remember that call from right at the start of this series). The open method at node 2 now calls CQScanExchangeNew::StartAllProducers to create the parallel tasks for Branch B.

The parent task now waits on CXPACKET at the consumer side of the node 2 gather streams exchange. This wait will continue until the newly-created Branch B tasks have completed their nested Open calls and returned to complete opening the producer side of the gather streams exchange.

Branch B parallel tasks open

The two new parallel tasks in Branch B start at the producer side of the node 2 gather streams exchange. Following the usual row mode iterative execution model, they call:

  • CQScanXProducerNew::Open (node 2 producer side open).
  • CQScanProfileNew::Open (profiler for node 3).
  • CQScanMergeJoinNew::Open (node 3 merge join).
  • CQScanProfileNew::Open (profiler for node 4).
  • CQScanStreamAggregateNew::Open (node 4 stream aggregate).
  • CQScanProfileNew::Open (profiler for node 5).
  • CQScanExchangeNew::Open (repartition streams exchange).

The parallel tasks both follow the outer (upper) input to the merge join, just as the early phase processing did.

Completing the exchange

When the Branch B tasks arrive at the consumer side of the repartition streams exchange at node 5, each task:

  • Registers with the exchange port (CXPort).
  • Creates the pipes (CXPipe) that connect this task to one or more producer side tasks (depending on the type of exchange). The current exchange is a repartition streams, so each consumer task has two pipes (at DOP 2). Each consumer may receive rows from either of the two producers.
  • Adds a CXPipeMerge to merge rows from multiple pipes (since this is an order-preserving exchange).
  • Creates row packets (confusingly named CXPacket) used for flow control and to buffer rows across the exchange pipes. These are allocated from previously granted query memory.

Once both consumer-side parallel tasks have completed that work, the node 5 exchange is ready to go. The two consumers (in Branch B) and the two producers (in Branch C) have all opened the exchange port, so the node 5 CXPACKET waits end.

Checkpoint

Parallel branches

As things stand:

  • The parent task in Branch A is waiting on CXPACKET at the consumer side of the node 2 gather streams exchange. This wait will continue until both node 2 producers return and open the exchange.
  • The two parallel tasks in Branch B are runnable. They have just opened the consumer side of the repartition streams exchange at node 5.
  • The two parallel tasks in Branch C have just been released from their CXPACKET wait, and are now runnable. The two stream aggregates at node 6 (one per parallel task) can begin aggregating rows from the two sorts at node 7. Recall the index seeks at node 9 closed some time ago, when the sorts completed their input phase.
  • The two parallel tasks in Branch D are waiting on CXPACKET at the producer side of the repartition streams exchange at node 11. They are waiting for the consumer side of node 11 to be opened by the two parallel tasks in Branch B. The index seeks have closed down, and the sorts are ready to transition to their output phase.

Multiple active branches

This is the first time we have had multiple branches (B and C) active at the same time, which could be challenging to discuss. Luckily, the design of the demo query is such that the stream aggregates in Branch C will produce only a few rows. The small number of narrow output rows will easily fit in the row packet buffers at the node 5 repartition streams exchange. The Branch C tasks can therefore get on with their work (and eventually close down) without waiting for the node 5 repartition streams consumer side to fetch any rows.

Conveniently, this means we can let the two Branch C parallel tasks run in the background without worrying about them. We need only concern ourselves with what the two Branch B parallel tasks are doing.

Branch B opening completes

A reminder of Branch B:

Branch B

The two parallel workers in Branch B return from their Open calls at the node 5 repartition streams exchange. This takes them back though the stream aggregate at node 4, to the merge join at node 3.

Because we are ascending the tree in the Open method, the profilers above node 5 and node 4 are recording last active time, as well as accumulating elapsed and CPU times (per task). We are not executing early phases on the parent task now, so the numbers recorded for execution context zero are not affected.

At the merge join, the two Branch B parallel tasks start descending the inner (lower) input, taking them through the stream aggregate at node 10 (and a couple of profilers) to the consumer side of the repartition streams exchange at node 11.

Branch D resumes execution

A repeat of the Branch C events at node 5 now occurs at the node 11 repartition streams. The consumer side of the node 11 exchange is completed and opened. The two producers in Branch D end their CXPACKET waits, becoming runnable again. We will let the Branch D tasks run in the background, placing their results in exchange buffers.

Branch D

There are now six parallel tasks (two each in Branches B, C, and D) cooperatively sharing time on the two schedulers assigned to additional parallel tasks in this query.

Branch A Opening Completes

The two parallel tasks in Branch B return from their Open calls at the node 11 repartition streams exchange, up past the node 10 stream aggregate, through the merge join at node 3, and back to the producer side of the gather streams at node 2. Profiler last active and accumulated elapsed & CPU times are updated as we ascend the tree in nested Open methods.

At the producer side of the gather streams exchange, the two Branch B parallel tasks synchronize opening the exchange port, then wait on CXPACKET for the consumer side to open.

The parent task waiting on the consumer side of the gather streams is now released from its CXPACKET wait, which allows it to complete opening the exchange port on the consumer side. This in turn releases the producers from their (brief) CXPACKET wait. The node 2 gather streams has now been opened by all owners.

Completing the Query Scan

The parent task now ascends the query scan tree from the gather streams exchange, returning from the Open calls at the exchange, segment, and sequence project operators in Branch A.

This completes opening the query scan tree, initiated all that while ago by the call to CQueryScan::StartupQuery. All branches of the parallel plan have now started executing.

Returning rows

The execution plan is ready to begin returning rows in response to GetRow calls at the root of the query scan tree, initiated by a call to CQueryScan::GetRow. I am not going to go into full detail, since it is strictly beyond the scope of an article about how parallel plans start up.

Still, the brief sequence is:

  • The parent task calls GetRow on the sequence project, which calls GetRow on the segment, which calls GetRow on the consumer side of the gather streams exchange.
  • If are no rows available at the exchange yet, the parent task waits on CXCONSUMER.
  • Meanwhile, the independently-running Branch B parallel tasks have been recursively calling GetRow starting at the producer side of the gather streams exchange.
  • Rows are supplied to Branch B by the consumer sides of the repartition streams exchanges at nodes 5 and 12.
  • Branches C and D are still processing rows from their sorts through their respective stream aggregates. Branch B tasks may have to wait on CXCONSUMER at repartition streams nodes 5 and 12 for a complete packet of rows to become available.
  • Rows emerging from the nested GetRow calls in Branch B are assembled into row packets at the producer side of the gather streams exchange.
  • The parent task’s CXCONSUMER wait at the consumer side of the gather streams ends when a packet becomes available.
  • A row at a time is then processed through the parent operators in Branch A, and finally on to the client.
  • Eventually, the rows run out, and a nested Close call ripples down the tree, across the exchanges, and parallel execution comes to an end.

Summary and Final Notes

First, a summary of the execution sequence of this particular parallel execution plan:

Parallel branches

  1. The parent task opens branch A. Early phase processing begins at the gather streams exchange.
  2. Parent task early phase calls descend the scan tree to the index seek at node 9, then ascend back to the repartitioning exchange at node 5.
  3. The parent task starts parallel tasks for Branch C, then waits while they read all available rows into the blocking sort operators at node 7.
  4. Early phase calls ascend to the merge join, then descend the inner input to the exchange at node 11.
  5. Tasks for Branch D are started just as for Branch C, while the parent task waits at node 11.
  6. Early phase calls return from node 11 as far as the gather streams. The early phase ends here.
  7. The parent task creates parallel tasks for Branch B, and waits until the opening of branch B is complete.
  8. Branch B tasks reach the node 5 repartition streams, synchronize, complete the exchange, and release Branch C tasks to start aggregating rows from the sorts.
  9. When Branch B tasks reach the node 12 repartition streams, they synchronize, complete the exchange, and release Branch D tasks to start aggregating rows from the sort.
  10. Branch B tasks return to the gather streams exchange and synchronize, releasing the parent task from its wait. The parent task is now ready to start the process of returning rows to the client.

Additional notes

Ascending the query scan tree during early phase processing sets first and last active times at each profiling iterator for the parent task, but does not accumulate elapsed or CPU time. Ascending the tree during Open and GetRow calls on a parallel task sets last active time, and accumulates elapsed and CPU time at each profiling iterator per task.

Early phase processing is specific to row mode parallel plans. It is necessary to ensure exchanges are initialized in the correct order, and all the parallel machinery works correctly.

The parent task does not always perform the whole of early phase processing. Early phases start at a root exchange, but how those calls navigate the tree depends on the iterators encountered. I chose a merge join for this demo because it happens to require early phase processing for both inputs.

Early phases at (for example) a parallel hash join propagate down the build input only. When the hash join transitions to its probe phase, it opens iterators on that input, including any exchanges. Another round of early phase processing is initiated, handled by (exactly) one of the parallel tasks, playing the role of the parent task.

When early phase processing encounters a parallel branch containing a blocking iterator, it starts the additional parallel tasks for that branch, and waits for those producers to complete their opening phase. That branch may also have child branches, which are handled in the same way, recursively.

Some branches in a row mode parallel plan may be required to run on a single thread (e.g. due to a global aggregate or top). These ‘serial zones’ also run on an additional ‘parallel’ task, the only difference being there is only one task, execution context, and worker for that branch. Early phase processing works the same regardless of the number of tasks assigned to a branch. For example, a ‘serial zone’ reports timings for the parent task (or a parallel task playing that role) as well as the single additional task. This manifests in showplan as data for “thread 0” (early phases) as well as “thread 1” (the additional task).

Closing thoughts

All this certainly represents an extra layer of complexity. The return on that investment is in runtime resource usage (primarily threads and memory), reduced synchronization waits, increased throughput, potentially accurate performance metrics, and a minimized chance of intra-query parallel deadlocks.

Though row mode parallelism has largely been eclipsed by the more modern batch mode parallel execution engine, the row mode design still has a certain beauty to it. Most iterators get to pretend they are still running in a serial plan, with almost all of the synchronization, flow control, and scheduling handled by the exchanges. The care and attention evident in implementation details like early phase processing enables even the largest parallel plans to execute successfully without the query designer giving too much thought to the practical difficulties.

Thank you for reading.