Changing aws credentials in hadoop configuration for pyspark during runtime after initialization of spark...
I've looked around on Stack Overflow for solutions to related problem, but it seems that this one is fairly unique. For context, I need to refresh AWS security credentials every hour due to company procedures, and I'm struggling to add the new refreshed security credentials to spark. Everything works fine in the first hour (I can access and read tables from s3, etc), but I'm unable to successfully change my aws credentials after the first hour is up and the credentials are refreshed.
Once I refresh my aws credentials, here is the code that I'm using to update spark to make them use the new aws credentials:
sc = spark.sparkContext
def getAWSKeys(profile):
awsCreds = {}
Config = ConfigParser.ConfigParser()
Config.read(os.path.join(os.getenv("HOME"), '.aws', 'credentials'))
if profile in Config.sections():
awsCreds["aws_access_key_id"] = Config.get(
profile, "aws_access_key_id")
awsCreds["aws_secret_access_key"] = Config.get(
profile, "aws_secret_access_key")
awsCreds["aws_session_token"] = Config.get(
profile, "aws_session_token")
return awsCreds
awsKeys = getAWSKeys(profile)
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3.session.token",
awsKeys["aws_session_token"])
sc._jsc.hadoopConfiguration().set("fs.s3.enableServerSideEncryption", "true")
sc._jsc.hadoopConfiguration().set("fs.s3.access.key",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3.secret.key",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3.endpoint",
"s3.us-east-1.amazonaws.com")
sc._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3a.session.token",
awsKeys["aws_session_token"])
sc._jsc.hadoopConfiguration().set("fs.s3a.enableServerSideEncryption", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint",
"s3.us-east-1.amazonaws.com")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3n.session.token",
awsKeys["aws_session_token"])
sc._jsc.hadoopConfiguration().set("fs.s3n.enableServerSideEncryption", "true")
sc._jsc.hadoopConfiguration().set("fs.s3n.access.key",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3n.secret.key",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3n.endpoint",
"s3.us-east-1.amazonaws.com")
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
sc.setSystemProperty("com.amazonaws.services.s3n.enableV4", "true")
sc.setSystemProperty("com.amazonaws.services.s3a.enableV4", "true")
# sc._jsc.hadoopConfiguration().set("fs.s3.aws.credentials.provider",
# "org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider")
os.environ['AWS_ACCESS_KEY_ID'] = awsKeys["aws_access_key_id"]
os.environ['AWS_SECRET_ACCESS_KEY'] = awsKeys["aws_secret_access_key"]
os.environ['AWS_SESSION_TOKEN'] = awsKeys["aws_session_token"]
I've attempted to be exhaustive in my approach, but sadly nothing has worked. The error that I get is:
Py4JJavaError Traceback (most recent call last)
<ipython-input-57-674174eca978> in <module>()
3 table = (
4 spark.read.option("delimiter", "|")
----> 5 .csv(f"s3n://{s3_path}/{file1}", header = True, inferSchema=True)
6 .select("col1", "col2", "col3", "col4")
7 )
/usr/lib/spark/python/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine)
408 if isinstance(path, basestring):
409 path = [path]
--> 410 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
411
412 @since(1.5)
/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o12923.csv.
: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 9A4F6DDEA3BD8AA6), S3 Extended Request ID: xg9ZiPjfV3h4rGgs5emsUiWl8xQdv0OMhK/91qdAs/iIvapWgIlWh9m1qLTGj3ODFM9MtEnuueg=
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1588)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1258)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4169)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4116)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1237)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:24)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:10)
at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:82)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy36.retrieveMetadata(Unknown Source)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:768)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:311)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:359)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:348)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
To reiterate, everything works fine in the first hour, but I get the 400 Bad Request error when I refresh the aws credentials. I've attempted to add these new aws credentials to spark, but nothing that I've tried has worked.
python amazon-web-services apache-spark amazon-s3 pyspark
add a comment |
I've looked around on Stack Overflow for solutions to related problem, but it seems that this one is fairly unique. For context, I need to refresh AWS security credentials every hour due to company procedures, and I'm struggling to add the new refreshed security credentials to spark. Everything works fine in the first hour (I can access and read tables from s3, etc), but I'm unable to successfully change my aws credentials after the first hour is up and the credentials are refreshed.
Once I refresh my aws credentials, here is the code that I'm using to update spark to make them use the new aws credentials:
sc = spark.sparkContext
def getAWSKeys(profile):
awsCreds = {}
Config = ConfigParser.ConfigParser()
Config.read(os.path.join(os.getenv("HOME"), '.aws', 'credentials'))
if profile in Config.sections():
awsCreds["aws_access_key_id"] = Config.get(
profile, "aws_access_key_id")
awsCreds["aws_secret_access_key"] = Config.get(
profile, "aws_secret_access_key")
awsCreds["aws_session_token"] = Config.get(
profile, "aws_session_token")
return awsCreds
awsKeys = getAWSKeys(profile)
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3.session.token",
awsKeys["aws_session_token"])
sc._jsc.hadoopConfiguration().set("fs.s3.enableServerSideEncryption", "true")
sc._jsc.hadoopConfiguration().set("fs.s3.access.key",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3.secret.key",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3.endpoint",
"s3.us-east-1.amazonaws.com")
sc._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3a.session.token",
awsKeys["aws_session_token"])
sc._jsc.hadoopConfiguration().set("fs.s3a.enableServerSideEncryption", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint",
"s3.us-east-1.amazonaws.com")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3n.session.token",
awsKeys["aws_session_token"])
sc._jsc.hadoopConfiguration().set("fs.s3n.enableServerSideEncryption", "true")
sc._jsc.hadoopConfiguration().set("fs.s3n.access.key",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3n.secret.key",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3n.endpoint",
"s3.us-east-1.amazonaws.com")
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
sc.setSystemProperty("com.amazonaws.services.s3n.enableV4", "true")
sc.setSystemProperty("com.amazonaws.services.s3a.enableV4", "true")
# sc._jsc.hadoopConfiguration().set("fs.s3.aws.credentials.provider",
# "org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider")
os.environ['AWS_ACCESS_KEY_ID'] = awsKeys["aws_access_key_id"]
os.environ['AWS_SECRET_ACCESS_KEY'] = awsKeys["aws_secret_access_key"]
os.environ['AWS_SESSION_TOKEN'] = awsKeys["aws_session_token"]
I've attempted to be exhaustive in my approach, but sadly nothing has worked. The error that I get is:
Py4JJavaError Traceback (most recent call last)
<ipython-input-57-674174eca978> in <module>()
3 table = (
4 spark.read.option("delimiter", "|")
----> 5 .csv(f"s3n://{s3_path}/{file1}", header = True, inferSchema=True)
6 .select("col1", "col2", "col3", "col4")
7 )
/usr/lib/spark/python/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine)
408 if isinstance(path, basestring):
409 path = [path]
--> 410 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
411
412 @since(1.5)
/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o12923.csv.
: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 9A4F6DDEA3BD8AA6), S3 Extended Request ID: xg9ZiPjfV3h4rGgs5emsUiWl8xQdv0OMhK/91qdAs/iIvapWgIlWh9m1qLTGj3ODFM9MtEnuueg=
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1588)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1258)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4169)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4116)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1237)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:24)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:10)
at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:82)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy36.retrieveMetadata(Unknown Source)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:768)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:311)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:359)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:348)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
To reiterate, everything works fine in the first hour, but I get the 400 Bad Request error when I refresh the aws credentials. I've attempted to add these new aws credentials to spark, but nothing that I've tried has worked.
python amazon-web-services apache-spark amazon-s3 pyspark
add a comment |
I've looked around on Stack Overflow for solutions to related problem, but it seems that this one is fairly unique. For context, I need to refresh AWS security credentials every hour due to company procedures, and I'm struggling to add the new refreshed security credentials to spark. Everything works fine in the first hour (I can access and read tables from s3, etc), but I'm unable to successfully change my aws credentials after the first hour is up and the credentials are refreshed.
Once I refresh my aws credentials, here is the code that I'm using to update spark to make them use the new aws credentials:
sc = spark.sparkContext
def getAWSKeys(profile):
awsCreds = {}
Config = ConfigParser.ConfigParser()
Config.read(os.path.join(os.getenv("HOME"), '.aws', 'credentials'))
if profile in Config.sections():
awsCreds["aws_access_key_id"] = Config.get(
profile, "aws_access_key_id")
awsCreds["aws_secret_access_key"] = Config.get(
profile, "aws_secret_access_key")
awsCreds["aws_session_token"] = Config.get(
profile, "aws_session_token")
return awsCreds
awsKeys = getAWSKeys(profile)
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3.session.token",
awsKeys["aws_session_token"])
sc._jsc.hadoopConfiguration().set("fs.s3.enableServerSideEncryption", "true")
sc._jsc.hadoopConfiguration().set("fs.s3.access.key",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3.secret.key",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3.endpoint",
"s3.us-east-1.amazonaws.com")
sc._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3a.session.token",
awsKeys["aws_session_token"])
sc._jsc.hadoopConfiguration().set("fs.s3a.enableServerSideEncryption", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint",
"s3.us-east-1.amazonaws.com")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3n.session.token",
awsKeys["aws_session_token"])
sc._jsc.hadoopConfiguration().set("fs.s3n.enableServerSideEncryption", "true")
sc._jsc.hadoopConfiguration().set("fs.s3n.access.key",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3n.secret.key",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3n.endpoint",
"s3.us-east-1.amazonaws.com")
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
sc.setSystemProperty("com.amazonaws.services.s3n.enableV4", "true")
sc.setSystemProperty("com.amazonaws.services.s3a.enableV4", "true")
# sc._jsc.hadoopConfiguration().set("fs.s3.aws.credentials.provider",
# "org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider")
os.environ['AWS_ACCESS_KEY_ID'] = awsKeys["aws_access_key_id"]
os.environ['AWS_SECRET_ACCESS_KEY'] = awsKeys["aws_secret_access_key"]
os.environ['AWS_SESSION_TOKEN'] = awsKeys["aws_session_token"]
I've attempted to be exhaustive in my approach, but sadly nothing has worked. The error that I get is:
Py4JJavaError Traceback (most recent call last)
<ipython-input-57-674174eca978> in <module>()
3 table = (
4 spark.read.option("delimiter", "|")
----> 5 .csv(f"s3n://{s3_path}/{file1}", header = True, inferSchema=True)
6 .select("col1", "col2", "col3", "col4")
7 )
/usr/lib/spark/python/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine)
408 if isinstance(path, basestring):
409 path = [path]
--> 410 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
411
412 @since(1.5)
/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o12923.csv.
: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 9A4F6DDEA3BD8AA6), S3 Extended Request ID: xg9ZiPjfV3h4rGgs5emsUiWl8xQdv0OMhK/91qdAs/iIvapWgIlWh9m1qLTGj3ODFM9MtEnuueg=
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1588)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1258)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4169)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4116)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1237)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:24)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:10)
at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:82)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy36.retrieveMetadata(Unknown Source)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:768)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:311)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:359)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:348)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
To reiterate, everything works fine in the first hour, but I get the 400 Bad Request error when I refresh the aws credentials. I've attempted to add these new aws credentials to spark, but nothing that I've tried has worked.
python amazon-web-services apache-spark amazon-s3 pyspark
I've looked around on Stack Overflow for solutions to related problem, but it seems that this one is fairly unique. For context, I need to refresh AWS security credentials every hour due to company procedures, and I'm struggling to add the new refreshed security credentials to spark. Everything works fine in the first hour (I can access and read tables from s3, etc), but I'm unable to successfully change my aws credentials after the first hour is up and the credentials are refreshed.
Once I refresh my aws credentials, here is the code that I'm using to update spark to make them use the new aws credentials:
sc = spark.sparkContext
def getAWSKeys(profile):
awsCreds = {}
Config = ConfigParser.ConfigParser()
Config.read(os.path.join(os.getenv("HOME"), '.aws', 'credentials'))
if profile in Config.sections():
awsCreds["aws_access_key_id"] = Config.get(
profile, "aws_access_key_id")
awsCreds["aws_secret_access_key"] = Config.get(
profile, "aws_secret_access_key")
awsCreds["aws_session_token"] = Config.get(
profile, "aws_session_token")
return awsCreds
awsKeys = getAWSKeys(profile)
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3.session.token",
awsKeys["aws_session_token"])
sc._jsc.hadoopConfiguration().set("fs.s3.enableServerSideEncryption", "true")
sc._jsc.hadoopConfiguration().set("fs.s3.access.key",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3.secret.key",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3.endpoint",
"s3.us-east-1.amazonaws.com")
sc._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3a.session.token",
awsKeys["aws_session_token"])
sc._jsc.hadoopConfiguration().set("fs.s3a.enableServerSideEncryption", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint",
"s3.us-east-1.amazonaws.com")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3n.session.token",
awsKeys["aws_session_token"])
sc._jsc.hadoopConfiguration().set("fs.s3n.enableServerSideEncryption", "true")
sc._jsc.hadoopConfiguration().set("fs.s3n.access.key",
awsKeys["aws_access_key_id"])
sc._jsc.hadoopConfiguration().set("fs.s3n.secret.key",
awsKeys["aws_secret_access_key"])
sc._jsc.hadoopConfiguration().set("fs.s3n.endpoint",
"s3.us-east-1.amazonaws.com")
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
sc.setSystemProperty("com.amazonaws.services.s3n.enableV4", "true")
sc.setSystemProperty("com.amazonaws.services.s3a.enableV4", "true")
# sc._jsc.hadoopConfiguration().set("fs.s3.aws.credentials.provider",
# "org.apache.hadoop.fs.s3.TemporaryAWSCredentialsProvider")
os.environ['AWS_ACCESS_KEY_ID'] = awsKeys["aws_access_key_id"]
os.environ['AWS_SECRET_ACCESS_KEY'] = awsKeys["aws_secret_access_key"]
os.environ['AWS_SESSION_TOKEN'] = awsKeys["aws_session_token"]
I've attempted to be exhaustive in my approach, but sadly nothing has worked. The error that I get is:
Py4JJavaError Traceback (most recent call last)
<ipython-input-57-674174eca978> in <module>()
3 table = (
4 spark.read.option("delimiter", "|")
----> 5 .csv(f"s3n://{s3_path}/{file1}", header = True, inferSchema=True)
6 .select("col1", "col2", "col3", "col4")
7 )
/usr/lib/spark/python/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine)
408 if isinstance(path, basestring):
409 path = [path]
--> 410 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
411
412 @since(1.5)
/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o12923.csv.
: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: 9A4F6DDEA3BD8AA6), S3 Extended Request ID: xg9ZiPjfV3h4rGgs5emsUiWl8xQdv0OMhK/91qdAs/iIvapWgIlWh9m1qLTGj3ODFM9MtEnuueg=
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1588)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1258)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1030)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:742)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:716)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4169)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4116)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1237)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:24)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.GetObjectMetadataCall.perform(GetObjectMetadataCall.java:10)
at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:82)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:94)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:39)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:211)
at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy36.retrieveMetadata(Unknown Source)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:768)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:311)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:359)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:348)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
To reiterate, everything works fine in the first hour, but I get the 400 Bad Request error when I refresh the aws credentials. I've attempted to add these new aws credentials to spark, but nothing that I've tried has worked.
python amazon-web-services apache-spark amazon-s3 pyspark
python amazon-web-services apache-spark amazon-s3 pyspark
asked Nov 20 '18 at 18:00
nattyjinattyji
31
31
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
I can't see an easy way of doing this, as those credentials get bonded to the filesystem and then frozen.
If I were trying to do this, I'd write my own implementation of AWSCredentialsProvider
which provides credentials for AWS calls. the default chain is something like: spark config, env vars, GET request to EC2 metadata service. You could add a new one which somehow picked up new values. You'd need to come up with a way of propagating the new session credentials to every host in the cluster though...hard work
The other thing is to know that AWS Assumed Roles have had their max life bumped up from 1 hour to 12 hours, so if you can get your IT team to increase the role you get assigned to 12 hours, you may just be able to get through the day.
Try that first.
ps: CSV "inferSchema=true" means "read through the entire CSV file once just to work out the schema". Avoid
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53398881%2fchanging-aws-credentials-in-hadoop-configuration-for-pyspark-during-runtime-afte%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
I can't see an easy way of doing this, as those credentials get bonded to the filesystem and then frozen.
If I were trying to do this, I'd write my own implementation of AWSCredentialsProvider
which provides credentials for AWS calls. the default chain is something like: spark config, env vars, GET request to EC2 metadata service. You could add a new one which somehow picked up new values. You'd need to come up with a way of propagating the new session credentials to every host in the cluster though...hard work
The other thing is to know that AWS Assumed Roles have had their max life bumped up from 1 hour to 12 hours, so if you can get your IT team to increase the role you get assigned to 12 hours, you may just be able to get through the day.
Try that first.
ps: CSV "inferSchema=true" means "read through the entire CSV file once just to work out the schema". Avoid
add a comment |
I can't see an easy way of doing this, as those credentials get bonded to the filesystem and then frozen.
If I were trying to do this, I'd write my own implementation of AWSCredentialsProvider
which provides credentials for AWS calls. the default chain is something like: spark config, env vars, GET request to EC2 metadata service. You could add a new one which somehow picked up new values. You'd need to come up with a way of propagating the new session credentials to every host in the cluster though...hard work
The other thing is to know that AWS Assumed Roles have had their max life bumped up from 1 hour to 12 hours, so if you can get your IT team to increase the role you get assigned to 12 hours, you may just be able to get through the day.
Try that first.
ps: CSV "inferSchema=true" means "read through the entire CSV file once just to work out the schema". Avoid
add a comment |
I can't see an easy way of doing this, as those credentials get bonded to the filesystem and then frozen.
If I were trying to do this, I'd write my own implementation of AWSCredentialsProvider
which provides credentials for AWS calls. the default chain is something like: spark config, env vars, GET request to EC2 metadata service. You could add a new one which somehow picked up new values. You'd need to come up with a way of propagating the new session credentials to every host in the cluster though...hard work
The other thing is to know that AWS Assumed Roles have had their max life bumped up from 1 hour to 12 hours, so if you can get your IT team to increase the role you get assigned to 12 hours, you may just be able to get through the day.
Try that first.
ps: CSV "inferSchema=true" means "read through the entire CSV file once just to work out the schema". Avoid
I can't see an easy way of doing this, as those credentials get bonded to the filesystem and then frozen.
If I were trying to do this, I'd write my own implementation of AWSCredentialsProvider
which provides credentials for AWS calls. the default chain is something like: spark config, env vars, GET request to EC2 metadata service. You could add a new one which somehow picked up new values. You'd need to come up with a way of propagating the new session credentials to every host in the cluster though...hard work
The other thing is to know that AWS Assumed Roles have had their max life bumped up from 1 hour to 12 hours, so if you can get your IT team to increase the role you get assigned to 12 hours, you may just be able to get through the day.
Try that first.
ps: CSV "inferSchema=true" means "read through the entire CSV file once just to work out the schema". Avoid
answered Nov 22 '18 at 14:03
Steve LoughranSteve Loughran
5,46711418
5,46711418
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53398881%2fchanging-aws-credentials-in-hadoop-configuration-for-pyspark-during-runtime-afte%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown