FDW and parallel execution

Lists: pgsql-hackers
From: Konstantin Knizhnik
To: Robert Haas , PostgreSQL-development
Subject: FDW and parallel execution
Date: 2017-04-02 13:30:24
Message-ID: [email protected]
Views: Raw Message | Whole Thread | Download mbox | Resend email
Lists: pgsql-hackers

Hi hackers and personally Robet (you are the best expert in both areas).
I want to ask one more question concerning parallel execution and FDW.
Below are two plans for the same query (TPC-H Q5): one for normal tables, another for FDW to vertical representation of the same data.
FDW supports analyze function and is expected to produce the similar statistic as for original tables.

Query plans are the following:

Normal tables:

Sort (cost=2041588.48..2041588.98 rows=200 width=48)
Sort Key: (sum((lineitem.l_extendedprice * ('1'::double precision - lineitem.l_discount)))) DESC
-> Finalize GroupAggregate (cost=2041335.76..2041580.83 rows=200 width=48)
Group Key: nation.n_name
-> Gather Merge (cost=2041335.76..2041568.33 rows=1400 width=48)
Workers Planned: 7
-> Partial GroupAggregate (cost=2040335.64..2040396.71 rows=200 width=48)
Group Key: nation.n_name
-> Sort (cost=2040335.64..2040345.49 rows=3938 width=40)
Sort Key: nation.n_name
-> Hash Join (cost=605052.97..2040100.48 rows=3938 width=40)
Hash Cond: ((orders.o_custkey = customer.c_custkey) AND (nation.n_nationkey = customer.c_nationkey))
-> Hash Join (cost=525126.37..1951647.85 rows=98414 width=52)
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Hash Join (cost=3802.22..1404473.37 rows=654240 width=52)
Hash Cond: (lineitem.l_suppkey = supplier.s_suppkey)
-> Parallel Seq Scan on lineitem (cost=0.00..1361993.36 rows=8569436 width=16)
-> Hash (cost=3705.97..3705.97 rows=7700 width=44)
-> Hash Join (cost=40.97..3705.97 rows=7700 width=44)
Hash Cond: (supplier.s_nationkey = nation.n_nationkey)
-> Seq Scan on supplier (cost=0.00..3090.00 rows=100000 width=8)
-> Hash (cost=40.79..40.79 rows=15 width=36)
-> Hash Join (cost=20.05..40.79 rows=15 width=36)
Hash Cond: (nation.n_regionkey = region.r_regionkey)
-> Seq Scan on nation (cost=0.00..17.70 rows=770 width=40)
-> Hash (cost=20.00..20.00 rows=4 width=4)
-> Seq Scan on region (cost=0.00..20.00 rows=4 width=4)
Filter: ((r_name)::text = 'ASIA'::text)
-> Hash (cost=484302.37..484302.37 rows=2256542 width=8)
-> Seq Scan on orders (cost=0.00..484302.37 rows=2256542 width=8)
Filter: ((o_orderdate >= '1996-01-01'::date) AND (o_orderdate < '1997-01-01'::date))
-> Hash (cost=51569.64..51569.64 rows=1499864 width=8)
-> Seq Scan on customer (cost=0.00..51569.64 rows=1499864 width=8)

Plan with FDW:

Sort (cost=2337312.28..2337312.78 rows=200 width=48)
Sort Key: (sum((lineitem_fdw.l_extendedprice * ('1'::double precision - lineitem_fdw.l_discount)))) DESC
-> GroupAggregate (cost=2336881.54..2337304.64 rows=200 width=48)
Group Key: nation.n_name
-> Sort (cost=2336881.54..2336951.73 rows=28073 width=40)
Sort Key: nation.n_name
-> Hash Join (cost=396050.65..2334807.39 rows=28073 width=40)
Hash Cond: ((orders_fdw.o_custkey = customer_fdw.c_custkey) AND (nation.n_nationkey = customer_fdw.c_nationkey))
-> Hash Join (cost=335084.53..2247223.46 rows=701672 width=52)
Hash Cond: (lineitem_fdw.l_orderkey = orders_fdw.o_orderkey)
-> Hash Join (cost=2887.07..1786058.18 rows=4607421 width=52)
Hash Cond: (lineitem_fdw.l_suppkey = supplier_fdw.s_suppkey)
-> Foreign Scan on lineitem_fdw (cost=0.00..1512151.52 rows=59986176 width=16)
-> Hash (cost=2790.80..2790.80 rows=7702 width=44)
-> Hash Join (cost=40.97..2790.80 rows=7702 width=44)
Hash Cond: (supplier_fdw.s_nationkey = nation.n_nationkey)
-> Foreign Scan on supplier_fdw (cost=0.00..2174.64 rows=100032 width=8)
-> Hash (cost=40.79..40.79 rows=15 width=36)
-> Hash Join (cost=20.05..40.79 rows=15 width=36)
Hash Cond: (nation.n_regionkey = region.r_regionkey)
-> Seq Scan on nation (cost=0.00..17.70 rows=770 width=40)
-> Hash (cost=20.00..20.00 rows=4 width=4)
-> Seq Scan on region (cost=0.00..20.00 rows=4 width=4)
Filter: ((r_name)::text = 'ASIA'::text)
-> Hash (cost=294718.76..294718.76 rows=2284376 width=8)
-> Foreign Scan on orders_fdw (cost=0.00..294718.76 rows=2284376 width=8)
-> Hash (cost=32605.64..32605.64 rows=1500032 width=8)
-> Foreign Scan on customer_fdw (cost=0.00..32605.64 rows=1500032 width=8)

The plans look very similar, but first one is parallel and second - not.
My FDW provides implementation for IsForeignScanParallelSafe which returns true.
I wonder what can prevent optimizer from using parallel plan in this case?

Thank in advance,

--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company


From: Kyotaro HORIGUCHI
To: k(dot)knizhnik(at)postgrespro(dot)ru
Cc: robertmhaas(at)gmail(dot)com, pgsql-hackers(at)postgresql(dot)org
Subject: Re: FDW and parallel execution
Date: 2017-04-04 10:29:09
Message-ID: [email protected]
Views: Raw Message | Whole Thread | Download mbox | Resend email
Lists: pgsql-hackers

Hi,

At Sun, 02 Apr 2017 16:30:24 +0300, Konstantin Knizhnik wrote in <58E0FCF0(dot)2070603(at)postgrespro(dot)ru>
> Hi hackers and personally Robet (you are the best expert in both
> areas).
> I want to ask one more question concerning parallel execution and FDW.
> Below are two plans for the same query (TPC-H Q5): one for normal
> tables, another for FDW to vertical representation of the same data.
> FDW supports analyze function and is expected to produce the similar
> statistic as for original tables.

> The plans look very similar, but first one is parallel and second -
> not.
> My FDW provides implementation for IsForeignScanParallelSafe which
> returns true.
> I wonder what can prevent optimizer from using parallel plan in this
> case?

Parallel execution requires partial paths. It's the work for
GetForeignPaths of your FDW.

regards,

--
Kyotaro Horiguchi
NTT Open Source Software Center


From: Konstantin Knizhnik
To: Kyotaro HORIGUCHI
Cc: robertmhaas(at)gmail(dot)com, pgsql-hackers(at)postgresql(dot)org
Subject: Re: FDW and parallel execution
Date: 2017-04-11 17:08:46
Message-ID: [email protected]
Views: Raw Message | Whole Thread | Download mbox | Resend email
Lists: pgsql-hackers

On 04.04.2017 13:29, Kyotaro HORIGUCHI wrote:
> Hi,
>
> At Sun, 02 Apr 2017 16:30:24 +0300, Konstantin Knizhnik wrote in <58E0FCF0(dot)2070603(at)postgrespro(dot)ru>
>> Hi hackers and personally Robet (you are the best expert in both
>> areas).
>> I want to ask one more question concerning parallel execution and FDW.
>> Below are two plans for the same query (TPC-H Q5): one for normal
>> tables, another for FDW to vertical representation of the same data.
>> FDW supports analyze function and is expected to produce the similar
>> statistic as for original tables.
>
>> The plans look very similar, but first one is parallel and second -
>> not.
>> My FDW provides implementation for IsForeignScanParallelSafe which
>> returns true.
>> I wonder what can prevent optimizer from using parallel plan in this
>> case?
> Parallel execution requires partial paths. It's the work for
> GetForeignPaths of your FDW.

Thank you very much for explanation.
But unfortunately I still do not completely understand what kind of
queries allow parallel execution with FDW.

Section "FDW Routines for Parallel Execution" of FDW specification says:
> A ForeignScan node can, optionally, support parallel execution. A
> parallel ForeignScan will be executed in multiple processes and should
> return each row only once across all cooperating processes. To do
> this, processes can coordinate through fixed size chunks of dynamic
> shared memory. This shared memory is not guaranteed to be mapped at
> the same address in every process, so pointers may not be used. The
> following callbacks are all optional in general, but required if
> parallel execution is to be supported.

I provided IsForeignScanParallelSafe, EstimateDSMForeignScan,
InitializeDSMForeignSca and InitializeWorkerForeignScan in my FDW.
IsForeignScanParallelSafe returns true.
Also in GetForeignPaths function I created path with
baserel->consider_parallel == true.
Is it enough or I should do something else?

But unfortunately I failed to find any query: sequential scan, grand
aggregation, aggregation with group by, joins... when parallel execution
plan is used for this FDW.
Also there are no examples of using this functions in Postgres
distributive and I failed to find any such examples in Internet.

Can somebody please clarify my situation with parallel execution and FDW
and may be point at some examples?
Thank in advance.

--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company


From: PostgreSQL - Hans-Jürgen Schönig
To: pgsql-hackers(at)postgresql(dot)org
Subject: Re: FDW and parallel execution
Date: 2017-04-11 17:20:04
Message-ID: [email protected]
Views: Raw Message | Whole Thread | Download mbox | Resend email
Lists: pgsql-hackers

hello ...

did you check out antonin houska's patches?
we basically got code, which can do that.

many thanks,

hans

On 04/02/2017 03:30 PM, Konstantin Knizhnik wrote:
> Hi hackers and personally Robet (you are the best expert in both areas).
> I want to ask one more question concerning parallel execution and FDW.
> Below are two plans for the same query (TPC-H Q5): one for normal
> tables, another for FDW to vertical representation of the same data.
> FDW supports analyze function and is expected to produce the similar
> statistic as for original tables.
>
> Query plans are the following:
>
> Normal tables:
>
> Sort (cost=2041588.48..2041588.98 rows=200 width=48)
> Sort Key: (sum((lineitem.l_extendedprice * ('1'::double precision -
> lineitem.l_discount)))) DESC
> -> Finalize GroupAggregate (cost=2041335.76..2041580.83 rows=200
> width=48)
> Group Key: nation.n_name
> -> Gather Merge (cost=2041335.76..2041568.33 rows=1400
> width=48)
> Workers Planned: 7
> -> Partial GroupAggregate
> (cost=2040335.64..2040396.71 rows=200 width=48)
> Group Key: nation.n_name
> -> Sort (cost=2040335.64..2040345.49 rows=3938
> width=40)
> Sort Key: nation.n_name
> -> Hash Join (cost=605052.97..2040100.48
> rows=3938 width=40)
> Hash Cond: ((orders.o_custkey =
> customer.c_custkey) AND (nation.n_nationkey = customer.c_nationkey))
> -> Hash Join
> (cost=525126.37..1951647.85 rows=98414 width=52)
> Hash Cond: (lineitem.l_orderkey
> = orders.o_orderkey)
> -> Hash Join
> (cost=3802.22..1404473.37 rows=654240 width=52)
> Hash Cond:
> (lineitem.l_suppkey = supplier.s_suppkey)
> -> Parallel Seq Scan on
> lineitem (cost=0.00..1361993.36 rows=8569436 width=16)
> -> Hash
> (cost=3705.97..3705.97 rows=7700 width=44)
> -> Hash Join
> (cost=40.97..3705.97 rows=7700 width=44)
> Hash Cond:
> (supplier.s_nationkey = nation.n_nationkey)
> -> Seq Scan
> on supplier (cost=0.00..3090.00 rows=100000 width=8)
> -> Hash
> (cost=40.79..40.79 rows=15 width=36)
> ->
> Hash Join (cost=20.05..40.79 rows=15 width=36)
>
> Hash Cond: (nation.n_regionkey = region.r_regionkey)
>
> -> Seq Scan on nation (cost=0.00..17.70 rows=770 width=40)
>
> -> Hash (cost=20.00..20.00 rows=4 width=4)
>
> -> Seq Scan on region (cost=0.00..20.00 rows=4 width=4)
>
> Filter: ((r_name)::text = 'ASIA'::text)
> -> Hash
> (cost=484302.37..484302.37 rows=2256542 width=8)
> -> Seq Scan on orders
> (cost=0.00..484302.37 rows=2256542 width=8)
> Filter:
> ((o_orderdate >= '1996-01-01'::date) AND (o_orderdate <
> '1997-01-01'::date))
> -> Hash (cost=51569.64..51569.64
> rows=1499864 width=8)
> -> Seq Scan on customer
> (cost=0.00..51569.64 rows=1499864 width=8)
>
>
> Plan with FDW:
>
> Sort (cost=2337312.28..2337312.78 rows=200 width=48)
> Sort Key: (sum((lineitem_fdw.l_extendedprice * ('1'::double
> precision - lineitem_fdw.l_discount)))) DESC
> -> GroupAggregate (cost=2336881.54..2337304.64 rows=200 width=48)
> Group Key: nation.n_name
> -> Sort (cost=2336881.54..2336951.73 rows=28073 width=40)
> Sort Key: nation.n_name
> -> Hash Join (cost=396050.65..2334807.39 rows=28073
> width=40)
> Hash Cond: ((orders_fdw.o_custkey =
> customer_fdw.c_custkey) AND (nation.n_nationkey =
> customer_fdw.c_nationkey))
> -> Hash Join (cost=335084.53..2247223.46
> rows=701672 width=52)
> Hash Cond: (lineitem_fdw.l_orderkey =
> orders_fdw.o_orderkey)
> -> Hash Join (cost=2887.07..1786058.18
> rows=4607421 width=52)
> Hash Cond: (lineitem_fdw.l_suppkey =
> supplier_fdw.s_suppkey)
> -> Foreign Scan on lineitem_fdw
> (cost=0.00..1512151.52 rows=59986176 width=16)
> -> Hash (cost=2790.80..2790.80
> rows=7702 width=44)
> -> Hash Join
> (cost=40.97..2790.80 rows=7702 width=44)
> Hash Cond:
> (supplier_fdw.s_nationkey = nation.n_nationkey)
> -> Foreign Scan on
> supplier_fdw (cost=0.00..2174.64 rows=100032 width=8)
> -> Hash
> (cost=40.79..40.79 rows=15 width=36)
> -> Hash Join
> (cost=20.05..40.79 rows=15 width=36)
> Hash Cond:
> (nation.n_regionkey = region.r_regionkey)
> -> Seq Scan
> on nation (cost=0.00..17.70 rows=770 width=40)
> -> Hash
> (cost=20.00..20.00 rows=4 width=4)
> -> Seq
> Scan on region (cost=0.00..20.00 rows=4 width=4)
>
> Filter: ((r_name)::text = 'ASIA'::text)
> -> Hash (cost=294718.76..294718.76
> rows=2284376 width=8)
> -> Foreign Scan on orders_fdw
> (cost=0.00..294718.76 rows=2284376 width=8)
> -> Hash (cost=32605.64..32605.64 rows=1500032
> width=8)
> -> Foreign Scan on customer_fdw
> (cost=0.00..32605.64 rows=1500032 width=8)
>
> The plans look very similar, but first one is parallel and second - not.
> My FDW provides implementation for IsForeignScanParallelSafe which
> returns true.
> I wonder what can prevent optimizer from using parallel plan in this case?
>
> Thank in advance,
> --
> Konstantin Knizhnik
> Postgres Professional: http://www.postgrespro.com
> The Russian Postgres Company

--
Hans-Jürgen Schönig
Cybertec Schönig & Schönig GmbH
Gröhrmühlgasse 26
A-2700 Wiener Neustadt
Web: http://www.postgresql-support.de, http://www.cybertec.at


From: Kyotaro HORIGUCHI
To: k(dot)knizhnik(at)postgrespro(dot)ru
Cc: robertmhaas(at)gmail(dot)com, pgsql-hackers(at)postgresql(dot)org
Subject: Re: FDW and parallel execution
Date: 2017-04-13 07:49:49
Message-ID: [email protected]
Views: Raw Message | Whole Thread | Download mbox | Resend email
Lists: pgsql-hackers

Sorry for the too-brief reply.

At Tue, 11 Apr 2017 20:08:46 +0300, Konstantin Knizhnik wrote in <94c8692a-f299-b72b-6227-270b8a9ed7ad(at)postgrespro(dot)ru>
>
> On 04.04.2017 13:29, Kyotaro HORIGUCHI wrote:
> > Hi,
> >
> > At Sun, 02 Apr 2017 16:30:24 +0300, Konstantin Knizhnik
> > wrote in <58E0FCF0(dot)2070603(at)postgrespro(dot)ru>
> >> My FDW provides implementation for IsForeignScanParallelSafe which
> >> returns true.
> >> I wonder what can prevent optimizer from using parallel plan in this
> >> case?
> > Parallel execution requires partial paths. It's the work for
> > GetForeignPaths of your FDW.
>
> Thank you very much for explanation.
> But unfortunately I still do not completely understand what kind of
> queries allow parallel execution with FDW.

At Tue, 11 Apr 2017 19:20:04 +0200, PostgreSQL - Hans-Jürgen Schönig wrote in <0c9c101d-0fbb-1e19-f04c-7a6ec577d960(at)cybertec(dot)at>
> did you check out antonin houska's patches?
> we basically got code, which can do that.

Parallel aggregation is already available. Antonin's patch is
partition-wise aggregation, which boosts the case where partition
key is aggregation key, I suppose. parallel aggregation seems to
be considered when any appropriate partial path is available. (I
haven't tried anything, though.)

set_plain_rel_pathlist() does the work for plain relations so
what we should do in GetForeignPaths would be follows.

- check rel->consider_parallel (won't be requried since the fDW
knows that) and rel->lateral_relids.

- If parallel is OK, create a path with create_foreignscan_path
in ordinary way then change some parallel related members as
necessary.

- Like create_plain_partial_paths(), check certain conditions and
finally add_partial_path() the created partial foreign scan path.

I haven't really done this, so I might be wrong.

> Section "FDW Routines for Parallel Execution" of FDW specification
> says:
> > A ForeignScan node can, optionally, support parallel execution. A
> > parallel ForeignScan will be executed in multiple processes and should
> > return each row only once across all cooperating processes. To do
> > this, processes can coordinate through fixed size chunks of dynamic
> > shared memory. This shared memory is not guaranteed to be mapped at
> > the same address in every process, so pointers may not be used. The
> > following callbacks are all optional in general, but required if
> > parallel execution is to be supported.
>
> I provided IsForeignScanParallelSafe, EstimateDSMForeignScan,
> InitializeDSMForeignSca and InitializeWorkerForeignScan in my FDW.
> IsForeignScanParallelSafe returns true.
> Also in GetForeignPaths function I created path with
> baserel->consider_parallel == true.
> Is it enough or I should do something else?

Creating partial paths, I think. create_grouping_paths() requires
partial_pathlist in input_rel.

The section is explaning FDW routines specially provided for
parallel execution. But it doesn't seem mentioning "how to run a
parallel execution" as a whole.

> But unfortunately I failed to find any query: sequential scan, grand
> aggregation, aggregation with group by, joins... when parallel
> execution plan is used for this FDW.
> Also there are no examples of using this functions in Postgres
> distributive and I failed to find any such examples in Internet.

Maybe you're the pioneer in this area.

> Can somebody please clarify my situation with parallel execution and
> FDW and may be point at some examples?
> Thank in advance.

regards,

--
Kyotaro Horiguchi
NTT Open Source Software Center