parallel execution of dask `DataFrame.set_index()`
I am trying to create an index on a large dask dataframe. No matter what scheduler I am unable to utilize more than the equivalent of one core for the operation. The code is:
(ddf.
.read_parquet(pq_in)
.set_index('title', drop=True, npartitions='auto', shuffle='disk', compute=False)
.to_parquet(pq_out, engine='fastparquet', object_encoding='json', write_index=True, compute=False)
.compute(scheduler=my_scheduler)
)
I am running this on a single 64-core machine. What can I do to utilize more cores? Or is set_index
inherently sequential?
dataframe concurrency parallel-processing dask dask-distributed
add a comment |
I am trying to create an index on a large dask dataframe. No matter what scheduler I am unable to utilize more than the equivalent of one core for the operation. The code is:
(ddf.
.read_parquet(pq_in)
.set_index('title', drop=True, npartitions='auto', shuffle='disk', compute=False)
.to_parquet(pq_out, engine='fastparquet', object_encoding='json', write_index=True, compute=False)
.compute(scheduler=my_scheduler)
)
I am running this on a single 64-core machine. What can I do to utilize more cores? Or is set_index
inherently sequential?
dataframe concurrency parallel-processing dask dask-distributed
add a comment |
I am trying to create an index on a large dask dataframe. No matter what scheduler I am unable to utilize more than the equivalent of one core for the operation. The code is:
(ddf.
.read_parquet(pq_in)
.set_index('title', drop=True, npartitions='auto', shuffle='disk', compute=False)
.to_parquet(pq_out, engine='fastparquet', object_encoding='json', write_index=True, compute=False)
.compute(scheduler=my_scheduler)
)
I am running this on a single 64-core machine. What can I do to utilize more cores? Or is set_index
inherently sequential?
dataframe concurrency parallel-processing dask dask-distributed
I am trying to create an index on a large dask dataframe. No matter what scheduler I am unable to utilize more than the equivalent of one core for the operation. The code is:
(ddf.
.read_parquet(pq_in)
.set_index('title', drop=True, npartitions='auto', shuffle='disk', compute=False)
.to_parquet(pq_out, engine='fastparquet', object_encoding='json', write_index=True, compute=False)
.compute(scheduler=my_scheduler)
)
I am running this on a single 64-core machine. What can I do to utilize more cores? Or is set_index
inherently sequential?
dataframe concurrency parallel-processing dask dask-distributed
dataframe concurrency parallel-processing dask dask-distributed
asked Nov 21 '18 at 22:44
Daniel MahlerDaniel Mahler
2,61622356
2,61622356
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
That should use multiple cores, though using disk for shuffling may introduce other bottlenecks like your local hard drive. Often you aren't bound by additional CPU cores.
In your situation I would use the distributed scheduler on a single machine so that you can use the diagnostic dashboard to get more insight about your computation.
changing using the distributed scheduler and settingshuffle='disk'
improves parallelism, but seems to make dask try to load all data into memory. Is it possible to do a parallel shuffle with larger than memory data?
– Daniel Mahler
Nov 23 '18 at 3:41
Actually my data does fit into memory. The problem is that the distributed scheduler seems to be loading the whole dataset into each worker process.
– Daniel Mahler
Nov 23 '18 at 4:39
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53421468%2fparallel-execution-of-dask-dataframe-set-index%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
That should use multiple cores, though using disk for shuffling may introduce other bottlenecks like your local hard drive. Often you aren't bound by additional CPU cores.
In your situation I would use the distributed scheduler on a single machine so that you can use the diagnostic dashboard to get more insight about your computation.
changing using the distributed scheduler and settingshuffle='disk'
improves parallelism, but seems to make dask try to load all data into memory. Is it possible to do a parallel shuffle with larger than memory data?
– Daniel Mahler
Nov 23 '18 at 3:41
Actually my data does fit into memory. The problem is that the distributed scheduler seems to be loading the whole dataset into each worker process.
– Daniel Mahler
Nov 23 '18 at 4:39
add a comment |
That should use multiple cores, though using disk for shuffling may introduce other bottlenecks like your local hard drive. Often you aren't bound by additional CPU cores.
In your situation I would use the distributed scheduler on a single machine so that you can use the diagnostic dashboard to get more insight about your computation.
changing using the distributed scheduler and settingshuffle='disk'
improves parallelism, but seems to make dask try to load all data into memory. Is it possible to do a parallel shuffle with larger than memory data?
– Daniel Mahler
Nov 23 '18 at 3:41
Actually my data does fit into memory. The problem is that the distributed scheduler seems to be loading the whole dataset into each worker process.
– Daniel Mahler
Nov 23 '18 at 4:39
add a comment |
That should use multiple cores, though using disk for shuffling may introduce other bottlenecks like your local hard drive. Often you aren't bound by additional CPU cores.
In your situation I would use the distributed scheduler on a single machine so that you can use the diagnostic dashboard to get more insight about your computation.
That should use multiple cores, though using disk for shuffling may introduce other bottlenecks like your local hard drive. Often you aren't bound by additional CPU cores.
In your situation I would use the distributed scheduler on a single machine so that you can use the diagnostic dashboard to get more insight about your computation.
answered Nov 22 '18 at 5:04
MRocklinMRocklin
27.1k1472130
27.1k1472130
changing using the distributed scheduler and settingshuffle='disk'
improves parallelism, but seems to make dask try to load all data into memory. Is it possible to do a parallel shuffle with larger than memory data?
– Daniel Mahler
Nov 23 '18 at 3:41
Actually my data does fit into memory. The problem is that the distributed scheduler seems to be loading the whole dataset into each worker process.
– Daniel Mahler
Nov 23 '18 at 4:39
add a comment |
changing using the distributed scheduler and settingshuffle='disk'
improves parallelism, but seems to make dask try to load all data into memory. Is it possible to do a parallel shuffle with larger than memory data?
– Daniel Mahler
Nov 23 '18 at 3:41
Actually my data does fit into memory. The problem is that the distributed scheduler seems to be loading the whole dataset into each worker process.
– Daniel Mahler
Nov 23 '18 at 4:39
changing using the distributed scheduler and setting
shuffle='disk'
improves parallelism, but seems to make dask try to load all data into memory. Is it possible to do a parallel shuffle with larger than memory data?– Daniel Mahler
Nov 23 '18 at 3:41
changing using the distributed scheduler and setting
shuffle='disk'
improves parallelism, but seems to make dask try to load all data into memory. Is it possible to do a parallel shuffle with larger than memory data?– Daniel Mahler
Nov 23 '18 at 3:41
Actually my data does fit into memory. The problem is that the distributed scheduler seems to be loading the whole dataset into each worker process.
– Daniel Mahler
Nov 23 '18 at 4:39
Actually my data does fit into memory. The problem is that the distributed scheduler seems to be loading the whole dataset into each worker process.
– Daniel Mahler
Nov 23 '18 at 4:39
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53421468%2fparallel-execution-of-dask-dataframe-set-index%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown