Spark RDD partition by key in exclusive way
I would like to partition an RDD by key and have that each parition contains only values of a single key. For example, if I have 100 different values of the key and I repartition(102)
, the RDD should have 2 empty partitions and 100 partitions containing each one a single key value.
I tried with groupByKey(k).repartition(102)
but this does not guarantee the exclusivity of a key in each partition, since I see some partitions containing more values of a single key and more than 2 empty.
Is there a way in the standard API to do this?
apache-spark pyspark rdd
add a comment |
I would like to partition an RDD by key and have that each parition contains only values of a single key. For example, if I have 100 different values of the key and I repartition(102)
, the RDD should have 2 empty partitions and 100 partitions containing each one a single key value.
I tried with groupByKey(k).repartition(102)
but this does not guarantee the exclusivity of a key in each partition, since I see some partitions containing more values of a single key and more than 2 empty.
Is there a way in the standard API to do this?
apache-spark pyspark rdd
was my answer helpful on this?
– vikrant rana
Dec 28 '18 at 12:38
add a comment |
I would like to partition an RDD by key and have that each parition contains only values of a single key. For example, if I have 100 different values of the key and I repartition(102)
, the RDD should have 2 empty partitions and 100 partitions containing each one a single key value.
I tried with groupByKey(k).repartition(102)
but this does not guarantee the exclusivity of a key in each partition, since I see some partitions containing more values of a single key and more than 2 empty.
Is there a way in the standard API to do this?
apache-spark pyspark rdd
I would like to partition an RDD by key and have that each parition contains only values of a single key. For example, if I have 100 different values of the key and I repartition(102)
, the RDD should have 2 empty partitions and 100 partitions containing each one a single key value.
I tried with groupByKey(k).repartition(102)
but this does not guarantee the exclusivity of a key in each partition, since I see some partitions containing more values of a single key and more than 2 empty.
Is there a way in the standard API to do this?
apache-spark pyspark rdd
apache-spark pyspark rdd
asked Nov 19 '18 at 21:56
alexlipaalexlipa
8718
8718
was my answer helpful on this?
– vikrant rana
Dec 28 '18 at 12:38
add a comment |
was my answer helpful on this?
– vikrant rana
Dec 28 '18 at 12:38
was my answer helpful on this?
– vikrant rana
Dec 28 '18 at 12:38
was my answer helpful on this?
– vikrant rana
Dec 28 '18 at 12:38
add a comment |
2 Answers
2
active
oldest
votes
to use partitionBy() RDD must consist of tuple (pair) objects. Lets see an example below:
Suppose I have an Input file with following data:
OrderId|OrderItem|OrderDate|OrderPrice|ItemQuantity
1|Gas|2018-01-17|1895|1
1|Air Conditioners|2018-01-28|19000|3
1|Television|2018-01-11|45000|2
2|Gas|2018-01-17|1895|1
2|Air Conditioners|2017-01-28|19000|3
2|Gas|2016-01-17|2300|1
1|Bottle|2018-03-24|45|10
1|Cooking oil|2018-04-22|100|3
3|Inverter|2015-11-02|29000|1
3|Gas|2014-01-09|2300|1
3|Television|2018-01-17|45000|2
4|Gas|2018-01-17|2300|1
4|Television$$|2018-01-17|45000|2
5|Medicine|2016-03-14|23.50|8
5|Cough Syrup|2016-01-28|190|1
5|Ice Cream|2014-09-23|300|7
5|Pasta|2015-06-30|65|2
PATH_TO_FILE="file:///u/vikrant/OrderInputFile"
reading file into RDD and skip header
RDD = sc.textFile(PATH_TO_FILE)
header=RDD.first();
newRDD = RDD.filter(lambda x:x != header)
now Lets re-partition RDD into '5' partitions
partitionRDD = newRDD.repartition(5)
lets have a look how data is being distributed in these '5' partitions
print("Partitions structure: {}".format(partitionRDD.glom().collect()))
here you can see that data is written into two partitions and, three of them are empty and also it's not being distributed uniformly.
Partitions structure: [,
[u'1|Gas|2018-01-17|1895|1', u'1|Air Conditioners|2018-01-28|19000|3', u'1|Television|2018-01-11|45000|2', u'2|Gas|2018-01-17|1895|1', u'2|Air Conditioners|2017-01-28|19000|3', u'2|Gas|2016-01-17|2300|1', u'1|Bottle|2018-03-24|45|10', u'1|Cooking oil|2018-04-22|100|3', u'3|Inverter|2015-11-02|29000|1', u'3|Gas|2014-01-09|2300|1'],
[u'3|Television|2018-01-17|45000|2', u'4|Gas|2018-01-17|2300|1', u'4|Television$$|2018-01-17|45000|2', u'5|Medicine|2016-03-14|23.50|8', u'5|Cough Syrup|2016-01-28|190|1', u'5|Ice Cream|2014-09-23|300|7', u'5|Pasta|2015-06-30|65|2'],
, ]
We need create a pair RDD in order have the RDD data distributed uniformly across the number of partitions.
Lets create a pair RDD and break it into key value pair.
pairRDD = newRDD.map(lambda x :(x[0],x[1:]))
now lets re partition this rdd into '5' partition and distribute the data uniformly into the partitions using key at [0]th position.
newpairRDD = pairRDD.partitionBy(5,lambda k: int(k[0]))
now we can see that data is being distributed uniformly according to the matching key value pairs.
print("Partitions structure: {}".format(newpairRDD.glom().collect()))
Partitions structure: [
[(u'5', u'|Medicine|2016-03-14|23.50|8'),
(u'5', u'|Cough Syrup|2016-01-28|190|1'),
(u'5', u'|Ice Cream|2014-09-23|300|7'),
(u'5', u'|Pasta|2015-06-30|65|2')],
[(u'1', u'|Gas|2018-01-17|1895|1'),
(u'1', u'|Air Conditioners|2018-01-28|19000|3'),
(u'1', u'|Television|2018-01-11|45000|2'),
(u'1', u'|Bottle|2018-03-24|45|10'),
(u'1', u'|Cooking oil|2018-04-22|100|3')],
[(u'2', u'|Gas|2018-01-17|1895|1'),
(u'2', u'|Air Conditioners|2017-01-28|19000|3'),
(u'2', u'|Gas|2016-01-17|2300|1')],
[(u'3', u'|Inverter|2015-11-02|29000|1'),
(u'3', u'|Gas|2014-01-09|2300|1'),
(u'3', u'|Television|2018-01-17|45000|2')],
[(u'4', u'|Gas|2018-01-17|2300|1'),
(u'4', u'|Television$$|2018-01-17|45000|2')]
]
below you can verify the number of records in each partitions.
from pyspark.sql.functions import desc
from pyspark.sql.functions import spark_partition_id
partitionSizes = newpairRDD.glom().map(len).collect();
[4, 5, 3, 3, 2]
Please note that when you create a pair RDD of key value pair, your key should be of type int else you will get an error.
Hope this helps!
add a comment |
For an RDD, have you tried using partitionBy to partition the RDD by key, like in this question? You can specify the number of partitions to be the number of keys to get rid of the empty partitions if desired.
In the Dataset API, you can use repartition with a Column
as an argument to partition by the values in that column (although note that this uses the value of spark.sql.shuffle.partitions
as the number of partitions, so you'll get a lot more empty partitions).
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53383205%2fspark-rdd-partition-by-key-in-exclusive-way%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
to use partitionBy() RDD must consist of tuple (pair) objects. Lets see an example below:
Suppose I have an Input file with following data:
OrderId|OrderItem|OrderDate|OrderPrice|ItemQuantity
1|Gas|2018-01-17|1895|1
1|Air Conditioners|2018-01-28|19000|3
1|Television|2018-01-11|45000|2
2|Gas|2018-01-17|1895|1
2|Air Conditioners|2017-01-28|19000|3
2|Gas|2016-01-17|2300|1
1|Bottle|2018-03-24|45|10
1|Cooking oil|2018-04-22|100|3
3|Inverter|2015-11-02|29000|1
3|Gas|2014-01-09|2300|1
3|Television|2018-01-17|45000|2
4|Gas|2018-01-17|2300|1
4|Television$$|2018-01-17|45000|2
5|Medicine|2016-03-14|23.50|8
5|Cough Syrup|2016-01-28|190|1
5|Ice Cream|2014-09-23|300|7
5|Pasta|2015-06-30|65|2
PATH_TO_FILE="file:///u/vikrant/OrderInputFile"
reading file into RDD and skip header
RDD = sc.textFile(PATH_TO_FILE)
header=RDD.first();
newRDD = RDD.filter(lambda x:x != header)
now Lets re-partition RDD into '5' partitions
partitionRDD = newRDD.repartition(5)
lets have a look how data is being distributed in these '5' partitions
print("Partitions structure: {}".format(partitionRDD.glom().collect()))
here you can see that data is written into two partitions and, three of them are empty and also it's not being distributed uniformly.
Partitions structure: [,
[u'1|Gas|2018-01-17|1895|1', u'1|Air Conditioners|2018-01-28|19000|3', u'1|Television|2018-01-11|45000|2', u'2|Gas|2018-01-17|1895|1', u'2|Air Conditioners|2017-01-28|19000|3', u'2|Gas|2016-01-17|2300|1', u'1|Bottle|2018-03-24|45|10', u'1|Cooking oil|2018-04-22|100|3', u'3|Inverter|2015-11-02|29000|1', u'3|Gas|2014-01-09|2300|1'],
[u'3|Television|2018-01-17|45000|2', u'4|Gas|2018-01-17|2300|1', u'4|Television$$|2018-01-17|45000|2', u'5|Medicine|2016-03-14|23.50|8', u'5|Cough Syrup|2016-01-28|190|1', u'5|Ice Cream|2014-09-23|300|7', u'5|Pasta|2015-06-30|65|2'],
, ]
We need create a pair RDD in order have the RDD data distributed uniformly across the number of partitions.
Lets create a pair RDD and break it into key value pair.
pairRDD = newRDD.map(lambda x :(x[0],x[1:]))
now lets re partition this rdd into '5' partition and distribute the data uniformly into the partitions using key at [0]th position.
newpairRDD = pairRDD.partitionBy(5,lambda k: int(k[0]))
now we can see that data is being distributed uniformly according to the matching key value pairs.
print("Partitions structure: {}".format(newpairRDD.glom().collect()))
Partitions structure: [
[(u'5', u'|Medicine|2016-03-14|23.50|8'),
(u'5', u'|Cough Syrup|2016-01-28|190|1'),
(u'5', u'|Ice Cream|2014-09-23|300|7'),
(u'5', u'|Pasta|2015-06-30|65|2')],
[(u'1', u'|Gas|2018-01-17|1895|1'),
(u'1', u'|Air Conditioners|2018-01-28|19000|3'),
(u'1', u'|Television|2018-01-11|45000|2'),
(u'1', u'|Bottle|2018-03-24|45|10'),
(u'1', u'|Cooking oil|2018-04-22|100|3')],
[(u'2', u'|Gas|2018-01-17|1895|1'),
(u'2', u'|Air Conditioners|2017-01-28|19000|3'),
(u'2', u'|Gas|2016-01-17|2300|1')],
[(u'3', u'|Inverter|2015-11-02|29000|1'),
(u'3', u'|Gas|2014-01-09|2300|1'),
(u'3', u'|Television|2018-01-17|45000|2')],
[(u'4', u'|Gas|2018-01-17|2300|1'),
(u'4', u'|Television$$|2018-01-17|45000|2')]
]
below you can verify the number of records in each partitions.
from pyspark.sql.functions import desc
from pyspark.sql.functions import spark_partition_id
partitionSizes = newpairRDD.glom().map(len).collect();
[4, 5, 3, 3, 2]
Please note that when you create a pair RDD of key value pair, your key should be of type int else you will get an error.
Hope this helps!
add a comment |
to use partitionBy() RDD must consist of tuple (pair) objects. Lets see an example below:
Suppose I have an Input file with following data:
OrderId|OrderItem|OrderDate|OrderPrice|ItemQuantity
1|Gas|2018-01-17|1895|1
1|Air Conditioners|2018-01-28|19000|3
1|Television|2018-01-11|45000|2
2|Gas|2018-01-17|1895|1
2|Air Conditioners|2017-01-28|19000|3
2|Gas|2016-01-17|2300|1
1|Bottle|2018-03-24|45|10
1|Cooking oil|2018-04-22|100|3
3|Inverter|2015-11-02|29000|1
3|Gas|2014-01-09|2300|1
3|Television|2018-01-17|45000|2
4|Gas|2018-01-17|2300|1
4|Television$$|2018-01-17|45000|2
5|Medicine|2016-03-14|23.50|8
5|Cough Syrup|2016-01-28|190|1
5|Ice Cream|2014-09-23|300|7
5|Pasta|2015-06-30|65|2
PATH_TO_FILE="file:///u/vikrant/OrderInputFile"
reading file into RDD and skip header
RDD = sc.textFile(PATH_TO_FILE)
header=RDD.first();
newRDD = RDD.filter(lambda x:x != header)
now Lets re-partition RDD into '5' partitions
partitionRDD = newRDD.repartition(5)
lets have a look how data is being distributed in these '5' partitions
print("Partitions structure: {}".format(partitionRDD.glom().collect()))
here you can see that data is written into two partitions and, three of them are empty and also it's not being distributed uniformly.
Partitions structure: [,
[u'1|Gas|2018-01-17|1895|1', u'1|Air Conditioners|2018-01-28|19000|3', u'1|Television|2018-01-11|45000|2', u'2|Gas|2018-01-17|1895|1', u'2|Air Conditioners|2017-01-28|19000|3', u'2|Gas|2016-01-17|2300|1', u'1|Bottle|2018-03-24|45|10', u'1|Cooking oil|2018-04-22|100|3', u'3|Inverter|2015-11-02|29000|1', u'3|Gas|2014-01-09|2300|1'],
[u'3|Television|2018-01-17|45000|2', u'4|Gas|2018-01-17|2300|1', u'4|Television$$|2018-01-17|45000|2', u'5|Medicine|2016-03-14|23.50|8', u'5|Cough Syrup|2016-01-28|190|1', u'5|Ice Cream|2014-09-23|300|7', u'5|Pasta|2015-06-30|65|2'],
, ]
We need create a pair RDD in order have the RDD data distributed uniformly across the number of partitions.
Lets create a pair RDD and break it into key value pair.
pairRDD = newRDD.map(lambda x :(x[0],x[1:]))
now lets re partition this rdd into '5' partition and distribute the data uniformly into the partitions using key at [0]th position.
newpairRDD = pairRDD.partitionBy(5,lambda k: int(k[0]))
now we can see that data is being distributed uniformly according to the matching key value pairs.
print("Partitions structure: {}".format(newpairRDD.glom().collect()))
Partitions structure: [
[(u'5', u'|Medicine|2016-03-14|23.50|8'),
(u'5', u'|Cough Syrup|2016-01-28|190|1'),
(u'5', u'|Ice Cream|2014-09-23|300|7'),
(u'5', u'|Pasta|2015-06-30|65|2')],
[(u'1', u'|Gas|2018-01-17|1895|1'),
(u'1', u'|Air Conditioners|2018-01-28|19000|3'),
(u'1', u'|Television|2018-01-11|45000|2'),
(u'1', u'|Bottle|2018-03-24|45|10'),
(u'1', u'|Cooking oil|2018-04-22|100|3')],
[(u'2', u'|Gas|2018-01-17|1895|1'),
(u'2', u'|Air Conditioners|2017-01-28|19000|3'),
(u'2', u'|Gas|2016-01-17|2300|1')],
[(u'3', u'|Inverter|2015-11-02|29000|1'),
(u'3', u'|Gas|2014-01-09|2300|1'),
(u'3', u'|Television|2018-01-17|45000|2')],
[(u'4', u'|Gas|2018-01-17|2300|1'),
(u'4', u'|Television$$|2018-01-17|45000|2')]
]
below you can verify the number of records in each partitions.
from pyspark.sql.functions import desc
from pyspark.sql.functions import spark_partition_id
partitionSizes = newpairRDD.glom().map(len).collect();
[4, 5, 3, 3, 2]
Please note that when you create a pair RDD of key value pair, your key should be of type int else you will get an error.
Hope this helps!
add a comment |
to use partitionBy() RDD must consist of tuple (pair) objects. Lets see an example below:
Suppose I have an Input file with following data:
OrderId|OrderItem|OrderDate|OrderPrice|ItemQuantity
1|Gas|2018-01-17|1895|1
1|Air Conditioners|2018-01-28|19000|3
1|Television|2018-01-11|45000|2
2|Gas|2018-01-17|1895|1
2|Air Conditioners|2017-01-28|19000|3
2|Gas|2016-01-17|2300|1
1|Bottle|2018-03-24|45|10
1|Cooking oil|2018-04-22|100|3
3|Inverter|2015-11-02|29000|1
3|Gas|2014-01-09|2300|1
3|Television|2018-01-17|45000|2
4|Gas|2018-01-17|2300|1
4|Television$$|2018-01-17|45000|2
5|Medicine|2016-03-14|23.50|8
5|Cough Syrup|2016-01-28|190|1
5|Ice Cream|2014-09-23|300|7
5|Pasta|2015-06-30|65|2
PATH_TO_FILE="file:///u/vikrant/OrderInputFile"
reading file into RDD and skip header
RDD = sc.textFile(PATH_TO_FILE)
header=RDD.first();
newRDD = RDD.filter(lambda x:x != header)
now Lets re-partition RDD into '5' partitions
partitionRDD = newRDD.repartition(5)
lets have a look how data is being distributed in these '5' partitions
print("Partitions structure: {}".format(partitionRDD.glom().collect()))
here you can see that data is written into two partitions and, three of them are empty and also it's not being distributed uniformly.
Partitions structure: [,
[u'1|Gas|2018-01-17|1895|1', u'1|Air Conditioners|2018-01-28|19000|3', u'1|Television|2018-01-11|45000|2', u'2|Gas|2018-01-17|1895|1', u'2|Air Conditioners|2017-01-28|19000|3', u'2|Gas|2016-01-17|2300|1', u'1|Bottle|2018-03-24|45|10', u'1|Cooking oil|2018-04-22|100|3', u'3|Inverter|2015-11-02|29000|1', u'3|Gas|2014-01-09|2300|1'],
[u'3|Television|2018-01-17|45000|2', u'4|Gas|2018-01-17|2300|1', u'4|Television$$|2018-01-17|45000|2', u'5|Medicine|2016-03-14|23.50|8', u'5|Cough Syrup|2016-01-28|190|1', u'5|Ice Cream|2014-09-23|300|7', u'5|Pasta|2015-06-30|65|2'],
, ]
We need create a pair RDD in order have the RDD data distributed uniformly across the number of partitions.
Lets create a pair RDD and break it into key value pair.
pairRDD = newRDD.map(lambda x :(x[0],x[1:]))
now lets re partition this rdd into '5' partition and distribute the data uniformly into the partitions using key at [0]th position.
newpairRDD = pairRDD.partitionBy(5,lambda k: int(k[0]))
now we can see that data is being distributed uniformly according to the matching key value pairs.
print("Partitions structure: {}".format(newpairRDD.glom().collect()))
Partitions structure: [
[(u'5', u'|Medicine|2016-03-14|23.50|8'),
(u'5', u'|Cough Syrup|2016-01-28|190|1'),
(u'5', u'|Ice Cream|2014-09-23|300|7'),
(u'5', u'|Pasta|2015-06-30|65|2')],
[(u'1', u'|Gas|2018-01-17|1895|1'),
(u'1', u'|Air Conditioners|2018-01-28|19000|3'),
(u'1', u'|Television|2018-01-11|45000|2'),
(u'1', u'|Bottle|2018-03-24|45|10'),
(u'1', u'|Cooking oil|2018-04-22|100|3')],
[(u'2', u'|Gas|2018-01-17|1895|1'),
(u'2', u'|Air Conditioners|2017-01-28|19000|3'),
(u'2', u'|Gas|2016-01-17|2300|1')],
[(u'3', u'|Inverter|2015-11-02|29000|1'),
(u'3', u'|Gas|2014-01-09|2300|1'),
(u'3', u'|Television|2018-01-17|45000|2')],
[(u'4', u'|Gas|2018-01-17|2300|1'),
(u'4', u'|Television$$|2018-01-17|45000|2')]
]
below you can verify the number of records in each partitions.
from pyspark.sql.functions import desc
from pyspark.sql.functions import spark_partition_id
partitionSizes = newpairRDD.glom().map(len).collect();
[4, 5, 3, 3, 2]
Please note that when you create a pair RDD of key value pair, your key should be of type int else you will get an error.
Hope this helps!
to use partitionBy() RDD must consist of tuple (pair) objects. Lets see an example below:
Suppose I have an Input file with following data:
OrderId|OrderItem|OrderDate|OrderPrice|ItemQuantity
1|Gas|2018-01-17|1895|1
1|Air Conditioners|2018-01-28|19000|3
1|Television|2018-01-11|45000|2
2|Gas|2018-01-17|1895|1
2|Air Conditioners|2017-01-28|19000|3
2|Gas|2016-01-17|2300|1
1|Bottle|2018-03-24|45|10
1|Cooking oil|2018-04-22|100|3
3|Inverter|2015-11-02|29000|1
3|Gas|2014-01-09|2300|1
3|Television|2018-01-17|45000|2
4|Gas|2018-01-17|2300|1
4|Television$$|2018-01-17|45000|2
5|Medicine|2016-03-14|23.50|8
5|Cough Syrup|2016-01-28|190|1
5|Ice Cream|2014-09-23|300|7
5|Pasta|2015-06-30|65|2
PATH_TO_FILE="file:///u/vikrant/OrderInputFile"
reading file into RDD and skip header
RDD = sc.textFile(PATH_TO_FILE)
header=RDD.first();
newRDD = RDD.filter(lambda x:x != header)
now Lets re-partition RDD into '5' partitions
partitionRDD = newRDD.repartition(5)
lets have a look how data is being distributed in these '5' partitions
print("Partitions structure: {}".format(partitionRDD.glom().collect()))
here you can see that data is written into two partitions and, three of them are empty and also it's not being distributed uniformly.
Partitions structure: [,
[u'1|Gas|2018-01-17|1895|1', u'1|Air Conditioners|2018-01-28|19000|3', u'1|Television|2018-01-11|45000|2', u'2|Gas|2018-01-17|1895|1', u'2|Air Conditioners|2017-01-28|19000|3', u'2|Gas|2016-01-17|2300|1', u'1|Bottle|2018-03-24|45|10', u'1|Cooking oil|2018-04-22|100|3', u'3|Inverter|2015-11-02|29000|1', u'3|Gas|2014-01-09|2300|1'],
[u'3|Television|2018-01-17|45000|2', u'4|Gas|2018-01-17|2300|1', u'4|Television$$|2018-01-17|45000|2', u'5|Medicine|2016-03-14|23.50|8', u'5|Cough Syrup|2016-01-28|190|1', u'5|Ice Cream|2014-09-23|300|7', u'5|Pasta|2015-06-30|65|2'],
, ]
We need create a pair RDD in order have the RDD data distributed uniformly across the number of partitions.
Lets create a pair RDD and break it into key value pair.
pairRDD = newRDD.map(lambda x :(x[0],x[1:]))
now lets re partition this rdd into '5' partition and distribute the data uniformly into the partitions using key at [0]th position.
newpairRDD = pairRDD.partitionBy(5,lambda k: int(k[0]))
now we can see that data is being distributed uniformly according to the matching key value pairs.
print("Partitions structure: {}".format(newpairRDD.glom().collect()))
Partitions structure: [
[(u'5', u'|Medicine|2016-03-14|23.50|8'),
(u'5', u'|Cough Syrup|2016-01-28|190|1'),
(u'5', u'|Ice Cream|2014-09-23|300|7'),
(u'5', u'|Pasta|2015-06-30|65|2')],
[(u'1', u'|Gas|2018-01-17|1895|1'),
(u'1', u'|Air Conditioners|2018-01-28|19000|3'),
(u'1', u'|Television|2018-01-11|45000|2'),
(u'1', u'|Bottle|2018-03-24|45|10'),
(u'1', u'|Cooking oil|2018-04-22|100|3')],
[(u'2', u'|Gas|2018-01-17|1895|1'),
(u'2', u'|Air Conditioners|2017-01-28|19000|3'),
(u'2', u'|Gas|2016-01-17|2300|1')],
[(u'3', u'|Inverter|2015-11-02|29000|1'),
(u'3', u'|Gas|2014-01-09|2300|1'),
(u'3', u'|Television|2018-01-17|45000|2')],
[(u'4', u'|Gas|2018-01-17|2300|1'),
(u'4', u'|Television$$|2018-01-17|45000|2')]
]
below you can verify the number of records in each partitions.
from pyspark.sql.functions import desc
from pyspark.sql.functions import spark_partition_id
partitionSizes = newpairRDD.glom().map(len).collect();
[4, 5, 3, 3, 2]
Please note that when you create a pair RDD of key value pair, your key should be of type int else you will get an error.
Hope this helps!
answered Nov 27 '18 at 14:23
vikrant ranavikrant rana
6221215
6221215
add a comment |
add a comment |
For an RDD, have you tried using partitionBy to partition the RDD by key, like in this question? You can specify the number of partitions to be the number of keys to get rid of the empty partitions if desired.
In the Dataset API, you can use repartition with a Column
as an argument to partition by the values in that column (although note that this uses the value of spark.sql.shuffle.partitions
as the number of partitions, so you'll get a lot more empty partitions).
add a comment |
For an RDD, have you tried using partitionBy to partition the RDD by key, like in this question? You can specify the number of partitions to be the number of keys to get rid of the empty partitions if desired.
In the Dataset API, you can use repartition with a Column
as an argument to partition by the values in that column (although note that this uses the value of spark.sql.shuffle.partitions
as the number of partitions, so you'll get a lot more empty partitions).
add a comment |
For an RDD, have you tried using partitionBy to partition the RDD by key, like in this question? You can specify the number of partitions to be the number of keys to get rid of the empty partitions if desired.
In the Dataset API, you can use repartition with a Column
as an argument to partition by the values in that column (although note that this uses the value of spark.sql.shuffle.partitions
as the number of partitions, so you'll get a lot more empty partitions).
For an RDD, have you tried using partitionBy to partition the RDD by key, like in this question? You can specify the number of partitions to be the number of keys to get rid of the empty partitions if desired.
In the Dataset API, you can use repartition with a Column
as an argument to partition by the values in that column (although note that this uses the value of spark.sql.shuffle.partitions
as the number of partitions, so you'll get a lot more empty partitions).
answered Nov 20 '18 at 18:27
JasonJason
614
614
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53383205%2fspark-rdd-partition-by-key-in-exclusive-way%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
was my answer helpful on this?
– vikrant rana
Dec 28 '18 at 12:38