Spark RDD partition by key in exclusive way












1















I would like to partition an RDD by key and have that each parition contains only values of a single key. For example, if I have 100 different values of the key and I repartition(102), the RDD should have 2 empty partitions and 100 partitions containing each one a single key value.



I tried with groupByKey(k).repartition(102) but this does not guarantee the exclusivity of a key in each partition, since I see some partitions containing more values of a single key and more than 2 empty.



Is there a way in the standard API to do this?










share|improve this question























  • was my answer helpful on this?

    – vikrant rana
    Dec 28 '18 at 12:38
















1















I would like to partition an RDD by key and have that each parition contains only values of a single key. For example, if I have 100 different values of the key and I repartition(102), the RDD should have 2 empty partitions and 100 partitions containing each one a single key value.



I tried with groupByKey(k).repartition(102) but this does not guarantee the exclusivity of a key in each partition, since I see some partitions containing more values of a single key and more than 2 empty.



Is there a way in the standard API to do this?










share|improve this question























  • was my answer helpful on this?

    – vikrant rana
    Dec 28 '18 at 12:38














1












1








1








I would like to partition an RDD by key and have that each parition contains only values of a single key. For example, if I have 100 different values of the key and I repartition(102), the RDD should have 2 empty partitions and 100 partitions containing each one a single key value.



I tried with groupByKey(k).repartition(102) but this does not guarantee the exclusivity of a key in each partition, since I see some partitions containing more values of a single key and more than 2 empty.



Is there a way in the standard API to do this?










share|improve this question














I would like to partition an RDD by key and have that each parition contains only values of a single key. For example, if I have 100 different values of the key and I repartition(102), the RDD should have 2 empty partitions and 100 partitions containing each one a single key value.



I tried with groupByKey(k).repartition(102) but this does not guarantee the exclusivity of a key in each partition, since I see some partitions containing more values of a single key and more than 2 empty.



Is there a way in the standard API to do this?







apache-spark pyspark rdd






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 19 '18 at 21:56









alexlipaalexlipa

8718




8718













  • was my answer helpful on this?

    – vikrant rana
    Dec 28 '18 at 12:38



















  • was my answer helpful on this?

    – vikrant rana
    Dec 28 '18 at 12:38

















was my answer helpful on this?

– vikrant rana
Dec 28 '18 at 12:38





was my answer helpful on this?

– vikrant rana
Dec 28 '18 at 12:38












2 Answers
2






active

oldest

votes


















1














to use partitionBy() RDD must consist of tuple (pair) objects. Lets see an example below:



Suppose I have an Input file with following data:



OrderId|OrderItem|OrderDate|OrderPrice|ItemQuantity
1|Gas|2018-01-17|1895|1
1|Air Conditioners|2018-01-28|19000|3
1|Television|2018-01-11|45000|2
2|Gas|2018-01-17|1895|1
2|Air Conditioners|2017-01-28|19000|3
2|Gas|2016-01-17|2300|1
1|Bottle|2018-03-24|45|10
1|Cooking oil|2018-04-22|100|3
3|Inverter|2015-11-02|29000|1
3|Gas|2014-01-09|2300|1
3|Television|2018-01-17|45000|2
4|Gas|2018-01-17|2300|1
4|Television$$|2018-01-17|45000|2
5|Medicine|2016-03-14|23.50|8
5|Cough Syrup|2016-01-28|190|1
5|Ice Cream|2014-09-23|300|7
5|Pasta|2015-06-30|65|2

PATH_TO_FILE="file:///u/vikrant/OrderInputFile"


reading file into RDD and skip header



RDD = sc.textFile(PATH_TO_FILE)
header=RDD.first();
newRDD = RDD.filter(lambda x:x != header)


now Lets re-partition RDD into '5' partitions



partitionRDD = newRDD.repartition(5)


lets have a look how data is being distributed in these '5' partitions



print("Partitions structure: {}".format(partitionRDD.glom().collect()))


here you can see that data is written into two partitions and, three of them are empty and also it's not being distributed uniformly.



Partitions structure: [, 
[u'1|Gas|2018-01-17|1895|1', u'1|Air Conditioners|2018-01-28|19000|3', u'1|Television|2018-01-11|45000|2', u'2|Gas|2018-01-17|1895|1', u'2|Air Conditioners|2017-01-28|19000|3', u'2|Gas|2016-01-17|2300|1', u'1|Bottle|2018-03-24|45|10', u'1|Cooking oil|2018-04-22|100|3', u'3|Inverter|2015-11-02|29000|1', u'3|Gas|2014-01-09|2300|1'],
[u'3|Television|2018-01-17|45000|2', u'4|Gas|2018-01-17|2300|1', u'4|Television$$|2018-01-17|45000|2', u'5|Medicine|2016-03-14|23.50|8', u'5|Cough Syrup|2016-01-28|190|1', u'5|Ice Cream|2014-09-23|300|7', u'5|Pasta|2015-06-30|65|2'],
, ]


We need create a pair RDD in order have the RDD data distributed uniformly across the number of partitions.
Lets create a pair RDD and break it into key value pair.



pairRDD = newRDD.map(lambda x :(x[0],x[1:]))


now lets re partition this rdd into '5' partition and distribute the data uniformly into the partitions using key at [0]th position.



newpairRDD = pairRDD.partitionBy(5,lambda k: int(k[0]))


now we can see that data is being distributed uniformly according to the matching key value pairs.



print("Partitions structure: {}".format(newpairRDD.glom().collect()))
Partitions structure: [
[(u'5', u'|Medicine|2016-03-14|23.50|8'),
(u'5', u'|Cough Syrup|2016-01-28|190|1'),
(u'5', u'|Ice Cream|2014-09-23|300|7'),
(u'5', u'|Pasta|2015-06-30|65|2')],

[(u'1', u'|Gas|2018-01-17|1895|1'),
(u'1', u'|Air Conditioners|2018-01-28|19000|3'),
(u'1', u'|Television|2018-01-11|45000|2'),
(u'1', u'|Bottle|2018-03-24|45|10'),
(u'1', u'|Cooking oil|2018-04-22|100|3')],

[(u'2', u'|Gas|2018-01-17|1895|1'),
(u'2', u'|Air Conditioners|2017-01-28|19000|3'),
(u'2', u'|Gas|2016-01-17|2300|1')],

[(u'3', u'|Inverter|2015-11-02|29000|1'),
(u'3', u'|Gas|2014-01-09|2300|1'),
(u'3', u'|Television|2018-01-17|45000|2')],

[(u'4', u'|Gas|2018-01-17|2300|1'),
(u'4', u'|Television$$|2018-01-17|45000|2')]
]


below you can verify the number of records in each partitions.



from pyspark.sql.functions import desc
from pyspark.sql.functions import spark_partition_id

partitionSizes = newpairRDD.glom().map(len).collect();

[4, 5, 3, 3, 2]


Please note that when you create a pair RDD of key value pair, your key should be of type int else you will get an error.



Hope this helps!






share|improve this answer































    2














    For an RDD, have you tried using partitionBy to partition the RDD by key, like in this question? You can specify the number of partitions to be the number of keys to get rid of the empty partitions if desired.



    In the Dataset API, you can use repartition with a Column as an argument to partition by the values in that column (although note that this uses the value of spark.sql.shuffle.partitions as the number of partitions, so you'll get a lot more empty partitions).






    share|improve this answer























      Your Answer






      StackExchange.ifUsing("editor", function () {
      StackExchange.using("externalEditor", function () {
      StackExchange.using("snippets", function () {
      StackExchange.snippets.init();
      });
      });
      }, "code-snippets");

      StackExchange.ready(function() {
      var channelOptions = {
      tags: "".split(" "),
      id: "1"
      };
      initTagRenderer("".split(" "), "".split(" "), channelOptions);

      StackExchange.using("externalEditor", function() {
      // Have to fire editor after snippets, if snippets enabled
      if (StackExchange.settings.snippets.snippetsEnabled) {
      StackExchange.using("snippets", function() {
      createEditor();
      });
      }
      else {
      createEditor();
      }
      });

      function createEditor() {
      StackExchange.prepareEditor({
      heartbeatType: 'answer',
      autoActivateHeartbeat: false,
      convertImagesToLinks: true,
      noModals: true,
      showLowRepImageUploadWarning: true,
      reputationToPostImages: 10,
      bindNavPrevention: true,
      postfix: "",
      imageUploader: {
      brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
      contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
      allowUrls: true
      },
      onDemand: true,
      discardSelector: ".discard-answer"
      ,immediatelyShowMarkdownHelp:true
      });


      }
      });














      draft saved

      draft discarded


















      StackExchange.ready(
      function () {
      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53383205%2fspark-rdd-partition-by-key-in-exclusive-way%23new-answer', 'question_page');
      }
      );

      Post as a guest















      Required, but never shown

























      2 Answers
      2






      active

      oldest

      votes








      2 Answers
      2






      active

      oldest

      votes









      active

      oldest

      votes






      active

      oldest

      votes









      1














      to use partitionBy() RDD must consist of tuple (pair) objects. Lets see an example below:



      Suppose I have an Input file with following data:



      OrderId|OrderItem|OrderDate|OrderPrice|ItemQuantity
      1|Gas|2018-01-17|1895|1
      1|Air Conditioners|2018-01-28|19000|3
      1|Television|2018-01-11|45000|2
      2|Gas|2018-01-17|1895|1
      2|Air Conditioners|2017-01-28|19000|3
      2|Gas|2016-01-17|2300|1
      1|Bottle|2018-03-24|45|10
      1|Cooking oil|2018-04-22|100|3
      3|Inverter|2015-11-02|29000|1
      3|Gas|2014-01-09|2300|1
      3|Television|2018-01-17|45000|2
      4|Gas|2018-01-17|2300|1
      4|Television$$|2018-01-17|45000|2
      5|Medicine|2016-03-14|23.50|8
      5|Cough Syrup|2016-01-28|190|1
      5|Ice Cream|2014-09-23|300|7
      5|Pasta|2015-06-30|65|2

      PATH_TO_FILE="file:///u/vikrant/OrderInputFile"


      reading file into RDD and skip header



      RDD = sc.textFile(PATH_TO_FILE)
      header=RDD.first();
      newRDD = RDD.filter(lambda x:x != header)


      now Lets re-partition RDD into '5' partitions



      partitionRDD = newRDD.repartition(5)


      lets have a look how data is being distributed in these '5' partitions



      print("Partitions structure: {}".format(partitionRDD.glom().collect()))


      here you can see that data is written into two partitions and, three of them are empty and also it's not being distributed uniformly.



      Partitions structure: [, 
      [u'1|Gas|2018-01-17|1895|1', u'1|Air Conditioners|2018-01-28|19000|3', u'1|Television|2018-01-11|45000|2', u'2|Gas|2018-01-17|1895|1', u'2|Air Conditioners|2017-01-28|19000|3', u'2|Gas|2016-01-17|2300|1', u'1|Bottle|2018-03-24|45|10', u'1|Cooking oil|2018-04-22|100|3', u'3|Inverter|2015-11-02|29000|1', u'3|Gas|2014-01-09|2300|1'],
      [u'3|Television|2018-01-17|45000|2', u'4|Gas|2018-01-17|2300|1', u'4|Television$$|2018-01-17|45000|2', u'5|Medicine|2016-03-14|23.50|8', u'5|Cough Syrup|2016-01-28|190|1', u'5|Ice Cream|2014-09-23|300|7', u'5|Pasta|2015-06-30|65|2'],
      , ]


      We need create a pair RDD in order have the RDD data distributed uniformly across the number of partitions.
      Lets create a pair RDD and break it into key value pair.



      pairRDD = newRDD.map(lambda x :(x[0],x[1:]))


      now lets re partition this rdd into '5' partition and distribute the data uniformly into the partitions using key at [0]th position.



      newpairRDD = pairRDD.partitionBy(5,lambda k: int(k[0]))


      now we can see that data is being distributed uniformly according to the matching key value pairs.



      print("Partitions structure: {}".format(newpairRDD.glom().collect()))
      Partitions structure: [
      [(u'5', u'|Medicine|2016-03-14|23.50|8'),
      (u'5', u'|Cough Syrup|2016-01-28|190|1'),
      (u'5', u'|Ice Cream|2014-09-23|300|7'),
      (u'5', u'|Pasta|2015-06-30|65|2')],

      [(u'1', u'|Gas|2018-01-17|1895|1'),
      (u'1', u'|Air Conditioners|2018-01-28|19000|3'),
      (u'1', u'|Television|2018-01-11|45000|2'),
      (u'1', u'|Bottle|2018-03-24|45|10'),
      (u'1', u'|Cooking oil|2018-04-22|100|3')],

      [(u'2', u'|Gas|2018-01-17|1895|1'),
      (u'2', u'|Air Conditioners|2017-01-28|19000|3'),
      (u'2', u'|Gas|2016-01-17|2300|1')],

      [(u'3', u'|Inverter|2015-11-02|29000|1'),
      (u'3', u'|Gas|2014-01-09|2300|1'),
      (u'3', u'|Television|2018-01-17|45000|2')],

      [(u'4', u'|Gas|2018-01-17|2300|1'),
      (u'4', u'|Television$$|2018-01-17|45000|2')]
      ]


      below you can verify the number of records in each partitions.



      from pyspark.sql.functions import desc
      from pyspark.sql.functions import spark_partition_id

      partitionSizes = newpairRDD.glom().map(len).collect();

      [4, 5, 3, 3, 2]


      Please note that when you create a pair RDD of key value pair, your key should be of type int else you will get an error.



      Hope this helps!






      share|improve this answer




























        1














        to use partitionBy() RDD must consist of tuple (pair) objects. Lets see an example below:



        Suppose I have an Input file with following data:



        OrderId|OrderItem|OrderDate|OrderPrice|ItemQuantity
        1|Gas|2018-01-17|1895|1
        1|Air Conditioners|2018-01-28|19000|3
        1|Television|2018-01-11|45000|2
        2|Gas|2018-01-17|1895|1
        2|Air Conditioners|2017-01-28|19000|3
        2|Gas|2016-01-17|2300|1
        1|Bottle|2018-03-24|45|10
        1|Cooking oil|2018-04-22|100|3
        3|Inverter|2015-11-02|29000|1
        3|Gas|2014-01-09|2300|1
        3|Television|2018-01-17|45000|2
        4|Gas|2018-01-17|2300|1
        4|Television$$|2018-01-17|45000|2
        5|Medicine|2016-03-14|23.50|8
        5|Cough Syrup|2016-01-28|190|1
        5|Ice Cream|2014-09-23|300|7
        5|Pasta|2015-06-30|65|2

        PATH_TO_FILE="file:///u/vikrant/OrderInputFile"


        reading file into RDD and skip header



        RDD = sc.textFile(PATH_TO_FILE)
        header=RDD.first();
        newRDD = RDD.filter(lambda x:x != header)


        now Lets re-partition RDD into '5' partitions



        partitionRDD = newRDD.repartition(5)


        lets have a look how data is being distributed in these '5' partitions



        print("Partitions structure: {}".format(partitionRDD.glom().collect()))


        here you can see that data is written into two partitions and, three of them are empty and also it's not being distributed uniformly.



        Partitions structure: [, 
        [u'1|Gas|2018-01-17|1895|1', u'1|Air Conditioners|2018-01-28|19000|3', u'1|Television|2018-01-11|45000|2', u'2|Gas|2018-01-17|1895|1', u'2|Air Conditioners|2017-01-28|19000|3', u'2|Gas|2016-01-17|2300|1', u'1|Bottle|2018-03-24|45|10', u'1|Cooking oil|2018-04-22|100|3', u'3|Inverter|2015-11-02|29000|1', u'3|Gas|2014-01-09|2300|1'],
        [u'3|Television|2018-01-17|45000|2', u'4|Gas|2018-01-17|2300|1', u'4|Television$$|2018-01-17|45000|2', u'5|Medicine|2016-03-14|23.50|8', u'5|Cough Syrup|2016-01-28|190|1', u'5|Ice Cream|2014-09-23|300|7', u'5|Pasta|2015-06-30|65|2'],
        , ]


        We need create a pair RDD in order have the RDD data distributed uniformly across the number of partitions.
        Lets create a pair RDD and break it into key value pair.



        pairRDD = newRDD.map(lambda x :(x[0],x[1:]))


        now lets re partition this rdd into '5' partition and distribute the data uniformly into the partitions using key at [0]th position.



        newpairRDD = pairRDD.partitionBy(5,lambda k: int(k[0]))


        now we can see that data is being distributed uniformly according to the matching key value pairs.



        print("Partitions structure: {}".format(newpairRDD.glom().collect()))
        Partitions structure: [
        [(u'5', u'|Medicine|2016-03-14|23.50|8'),
        (u'5', u'|Cough Syrup|2016-01-28|190|1'),
        (u'5', u'|Ice Cream|2014-09-23|300|7'),
        (u'5', u'|Pasta|2015-06-30|65|2')],

        [(u'1', u'|Gas|2018-01-17|1895|1'),
        (u'1', u'|Air Conditioners|2018-01-28|19000|3'),
        (u'1', u'|Television|2018-01-11|45000|2'),
        (u'1', u'|Bottle|2018-03-24|45|10'),
        (u'1', u'|Cooking oil|2018-04-22|100|3')],

        [(u'2', u'|Gas|2018-01-17|1895|1'),
        (u'2', u'|Air Conditioners|2017-01-28|19000|3'),
        (u'2', u'|Gas|2016-01-17|2300|1')],

        [(u'3', u'|Inverter|2015-11-02|29000|1'),
        (u'3', u'|Gas|2014-01-09|2300|1'),
        (u'3', u'|Television|2018-01-17|45000|2')],

        [(u'4', u'|Gas|2018-01-17|2300|1'),
        (u'4', u'|Television$$|2018-01-17|45000|2')]
        ]


        below you can verify the number of records in each partitions.



        from pyspark.sql.functions import desc
        from pyspark.sql.functions import spark_partition_id

        partitionSizes = newpairRDD.glom().map(len).collect();

        [4, 5, 3, 3, 2]


        Please note that when you create a pair RDD of key value pair, your key should be of type int else you will get an error.



        Hope this helps!






        share|improve this answer


























          1












          1








          1







          to use partitionBy() RDD must consist of tuple (pair) objects. Lets see an example below:



          Suppose I have an Input file with following data:



          OrderId|OrderItem|OrderDate|OrderPrice|ItemQuantity
          1|Gas|2018-01-17|1895|1
          1|Air Conditioners|2018-01-28|19000|3
          1|Television|2018-01-11|45000|2
          2|Gas|2018-01-17|1895|1
          2|Air Conditioners|2017-01-28|19000|3
          2|Gas|2016-01-17|2300|1
          1|Bottle|2018-03-24|45|10
          1|Cooking oil|2018-04-22|100|3
          3|Inverter|2015-11-02|29000|1
          3|Gas|2014-01-09|2300|1
          3|Television|2018-01-17|45000|2
          4|Gas|2018-01-17|2300|1
          4|Television$$|2018-01-17|45000|2
          5|Medicine|2016-03-14|23.50|8
          5|Cough Syrup|2016-01-28|190|1
          5|Ice Cream|2014-09-23|300|7
          5|Pasta|2015-06-30|65|2

          PATH_TO_FILE="file:///u/vikrant/OrderInputFile"


          reading file into RDD and skip header



          RDD = sc.textFile(PATH_TO_FILE)
          header=RDD.first();
          newRDD = RDD.filter(lambda x:x != header)


          now Lets re-partition RDD into '5' partitions



          partitionRDD = newRDD.repartition(5)


          lets have a look how data is being distributed in these '5' partitions



          print("Partitions structure: {}".format(partitionRDD.glom().collect()))


          here you can see that data is written into two partitions and, three of them are empty and also it's not being distributed uniformly.



          Partitions structure: [, 
          [u'1|Gas|2018-01-17|1895|1', u'1|Air Conditioners|2018-01-28|19000|3', u'1|Television|2018-01-11|45000|2', u'2|Gas|2018-01-17|1895|1', u'2|Air Conditioners|2017-01-28|19000|3', u'2|Gas|2016-01-17|2300|1', u'1|Bottle|2018-03-24|45|10', u'1|Cooking oil|2018-04-22|100|3', u'3|Inverter|2015-11-02|29000|1', u'3|Gas|2014-01-09|2300|1'],
          [u'3|Television|2018-01-17|45000|2', u'4|Gas|2018-01-17|2300|1', u'4|Television$$|2018-01-17|45000|2', u'5|Medicine|2016-03-14|23.50|8', u'5|Cough Syrup|2016-01-28|190|1', u'5|Ice Cream|2014-09-23|300|7', u'5|Pasta|2015-06-30|65|2'],
          , ]


          We need create a pair RDD in order have the RDD data distributed uniformly across the number of partitions.
          Lets create a pair RDD and break it into key value pair.



          pairRDD = newRDD.map(lambda x :(x[0],x[1:]))


          now lets re partition this rdd into '5' partition and distribute the data uniformly into the partitions using key at [0]th position.



          newpairRDD = pairRDD.partitionBy(5,lambda k: int(k[0]))


          now we can see that data is being distributed uniformly according to the matching key value pairs.



          print("Partitions structure: {}".format(newpairRDD.glom().collect()))
          Partitions structure: [
          [(u'5', u'|Medicine|2016-03-14|23.50|8'),
          (u'5', u'|Cough Syrup|2016-01-28|190|1'),
          (u'5', u'|Ice Cream|2014-09-23|300|7'),
          (u'5', u'|Pasta|2015-06-30|65|2')],

          [(u'1', u'|Gas|2018-01-17|1895|1'),
          (u'1', u'|Air Conditioners|2018-01-28|19000|3'),
          (u'1', u'|Television|2018-01-11|45000|2'),
          (u'1', u'|Bottle|2018-03-24|45|10'),
          (u'1', u'|Cooking oil|2018-04-22|100|3')],

          [(u'2', u'|Gas|2018-01-17|1895|1'),
          (u'2', u'|Air Conditioners|2017-01-28|19000|3'),
          (u'2', u'|Gas|2016-01-17|2300|1')],

          [(u'3', u'|Inverter|2015-11-02|29000|1'),
          (u'3', u'|Gas|2014-01-09|2300|1'),
          (u'3', u'|Television|2018-01-17|45000|2')],

          [(u'4', u'|Gas|2018-01-17|2300|1'),
          (u'4', u'|Television$$|2018-01-17|45000|2')]
          ]


          below you can verify the number of records in each partitions.



          from pyspark.sql.functions import desc
          from pyspark.sql.functions import spark_partition_id

          partitionSizes = newpairRDD.glom().map(len).collect();

          [4, 5, 3, 3, 2]


          Please note that when you create a pair RDD of key value pair, your key should be of type int else you will get an error.



          Hope this helps!






          share|improve this answer













          to use partitionBy() RDD must consist of tuple (pair) objects. Lets see an example below:



          Suppose I have an Input file with following data:



          OrderId|OrderItem|OrderDate|OrderPrice|ItemQuantity
          1|Gas|2018-01-17|1895|1
          1|Air Conditioners|2018-01-28|19000|3
          1|Television|2018-01-11|45000|2
          2|Gas|2018-01-17|1895|1
          2|Air Conditioners|2017-01-28|19000|3
          2|Gas|2016-01-17|2300|1
          1|Bottle|2018-03-24|45|10
          1|Cooking oil|2018-04-22|100|3
          3|Inverter|2015-11-02|29000|1
          3|Gas|2014-01-09|2300|1
          3|Television|2018-01-17|45000|2
          4|Gas|2018-01-17|2300|1
          4|Television$$|2018-01-17|45000|2
          5|Medicine|2016-03-14|23.50|8
          5|Cough Syrup|2016-01-28|190|1
          5|Ice Cream|2014-09-23|300|7
          5|Pasta|2015-06-30|65|2

          PATH_TO_FILE="file:///u/vikrant/OrderInputFile"


          reading file into RDD and skip header



          RDD = sc.textFile(PATH_TO_FILE)
          header=RDD.first();
          newRDD = RDD.filter(lambda x:x != header)


          now Lets re-partition RDD into '5' partitions



          partitionRDD = newRDD.repartition(5)


          lets have a look how data is being distributed in these '5' partitions



          print("Partitions structure: {}".format(partitionRDD.glom().collect()))


          here you can see that data is written into two partitions and, three of them are empty and also it's not being distributed uniformly.



          Partitions structure: [, 
          [u'1|Gas|2018-01-17|1895|1', u'1|Air Conditioners|2018-01-28|19000|3', u'1|Television|2018-01-11|45000|2', u'2|Gas|2018-01-17|1895|1', u'2|Air Conditioners|2017-01-28|19000|3', u'2|Gas|2016-01-17|2300|1', u'1|Bottle|2018-03-24|45|10', u'1|Cooking oil|2018-04-22|100|3', u'3|Inverter|2015-11-02|29000|1', u'3|Gas|2014-01-09|2300|1'],
          [u'3|Television|2018-01-17|45000|2', u'4|Gas|2018-01-17|2300|1', u'4|Television$$|2018-01-17|45000|2', u'5|Medicine|2016-03-14|23.50|8', u'5|Cough Syrup|2016-01-28|190|1', u'5|Ice Cream|2014-09-23|300|7', u'5|Pasta|2015-06-30|65|2'],
          , ]


          We need create a pair RDD in order have the RDD data distributed uniformly across the number of partitions.
          Lets create a pair RDD and break it into key value pair.



          pairRDD = newRDD.map(lambda x :(x[0],x[1:]))


          now lets re partition this rdd into '5' partition and distribute the data uniformly into the partitions using key at [0]th position.



          newpairRDD = pairRDD.partitionBy(5,lambda k: int(k[0]))


          now we can see that data is being distributed uniformly according to the matching key value pairs.



          print("Partitions structure: {}".format(newpairRDD.glom().collect()))
          Partitions structure: [
          [(u'5', u'|Medicine|2016-03-14|23.50|8'),
          (u'5', u'|Cough Syrup|2016-01-28|190|1'),
          (u'5', u'|Ice Cream|2014-09-23|300|7'),
          (u'5', u'|Pasta|2015-06-30|65|2')],

          [(u'1', u'|Gas|2018-01-17|1895|1'),
          (u'1', u'|Air Conditioners|2018-01-28|19000|3'),
          (u'1', u'|Television|2018-01-11|45000|2'),
          (u'1', u'|Bottle|2018-03-24|45|10'),
          (u'1', u'|Cooking oil|2018-04-22|100|3')],

          [(u'2', u'|Gas|2018-01-17|1895|1'),
          (u'2', u'|Air Conditioners|2017-01-28|19000|3'),
          (u'2', u'|Gas|2016-01-17|2300|1')],

          [(u'3', u'|Inverter|2015-11-02|29000|1'),
          (u'3', u'|Gas|2014-01-09|2300|1'),
          (u'3', u'|Television|2018-01-17|45000|2')],

          [(u'4', u'|Gas|2018-01-17|2300|1'),
          (u'4', u'|Television$$|2018-01-17|45000|2')]
          ]


          below you can verify the number of records in each partitions.



          from pyspark.sql.functions import desc
          from pyspark.sql.functions import spark_partition_id

          partitionSizes = newpairRDD.glom().map(len).collect();

          [4, 5, 3, 3, 2]


          Please note that when you create a pair RDD of key value pair, your key should be of type int else you will get an error.



          Hope this helps!







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 27 '18 at 14:23









          vikrant ranavikrant rana

          6221215




          6221215

























              2














              For an RDD, have you tried using partitionBy to partition the RDD by key, like in this question? You can specify the number of partitions to be the number of keys to get rid of the empty partitions if desired.



              In the Dataset API, you can use repartition with a Column as an argument to partition by the values in that column (although note that this uses the value of spark.sql.shuffle.partitions as the number of partitions, so you'll get a lot more empty partitions).






              share|improve this answer




























                2














                For an RDD, have you tried using partitionBy to partition the RDD by key, like in this question? You can specify the number of partitions to be the number of keys to get rid of the empty partitions if desired.



                In the Dataset API, you can use repartition with a Column as an argument to partition by the values in that column (although note that this uses the value of spark.sql.shuffle.partitions as the number of partitions, so you'll get a lot more empty partitions).






                share|improve this answer


























                  2












                  2








                  2







                  For an RDD, have you tried using partitionBy to partition the RDD by key, like in this question? You can specify the number of partitions to be the number of keys to get rid of the empty partitions if desired.



                  In the Dataset API, you can use repartition with a Column as an argument to partition by the values in that column (although note that this uses the value of spark.sql.shuffle.partitions as the number of partitions, so you'll get a lot more empty partitions).






                  share|improve this answer













                  For an RDD, have you tried using partitionBy to partition the RDD by key, like in this question? You can specify the number of partitions to be the number of keys to get rid of the empty partitions if desired.



                  In the Dataset API, you can use repartition with a Column as an argument to partition by the values in that column (although note that this uses the value of spark.sql.shuffle.partitions as the number of partitions, so you'll get a lot more empty partitions).







                  share|improve this answer












                  share|improve this answer



                  share|improve this answer










                  answered Nov 20 '18 at 18:27









                  JasonJason

                  614




                  614






























                      draft saved

                      draft discarded




















































                      Thanks for contributing an answer to Stack Overflow!


                      • Please be sure to answer the question. Provide details and share your research!

                      But avoid



                      • Asking for help, clarification, or responding to other answers.

                      • Making statements based on opinion; back them up with references or personal experience.


                      To learn more, see our tips on writing great answers.




                      draft saved


                      draft discarded














                      StackExchange.ready(
                      function () {
                      StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53383205%2fspark-rdd-partition-by-key-in-exclusive-way%23new-answer', 'question_page');
                      }
                      );

                      Post as a guest















                      Required, but never shown





















































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown

































                      Required, but never shown














                      Required, but never shown












                      Required, but never shown







                      Required, but never shown







                      Popular posts from this blog

                      How to change which sound is reproduced for terminal bell?

                      Can I use Tabulator js library in my java Spring + Thymeleaf project?

                      Title Spacing in Bjornstrup Chapter, Removing Chapter Number From Contents