Я пытаюсь прочитать файлы, присутствующие на Sequence
путях в scala. Ниже приведен пример (псевдо) кода:
val paths = Seq[String] //Seq of paths
val dataframe = spark.read.parquet(paths: _*)
Теперь в приведенном выше порядке одни пути существуют, а другие нет. Есть ли способ игнорировать отсутствующие пути при чтении parquet
файлов (чтобы избежать org.apache.spark.sql.AnalysisException: Path does not exist
)?
Я попробовал следующее, и, похоже, это работает, но в конце концов я дважды прочитал один и тот же путь, которого хотел бы избежать:
val filteredPaths = paths.filter(p => Try(spark.read.parquet(p)).isSuccess)
Я проверил options
метод, DataFrameReader
но, похоже, нет такой опции, как ignore_if_missing
.
Кроме того, эти пути могут быть hdfs
либо s3
( Seq
это передается как аргумент метода), и при чтении я не знаю, можно s3
ли hdfs
использовать путь s3
или hdfs
конкретный API для проверки существования.
Решение
Вы можете отфильтровать ненужные файлы, как в ответе @Psidom. В Spark лучший способ сделать это — использовать внутреннюю конфигурацию Spark для Hadoop. Учитывая, что переменная сеанса Spark называется «spark», вы можете сделать:
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
def testDirExist(path: String): Boolean = {
val p = new Path(path)
hadoopfs.exists(p) && hadoopfs.getFileStatus(p).isDirectory
}
val filteredPaths = paths.filter(p => testDirExists(p))
val dataframe = spark.read.parquet(filteredPaths: _*)
Комментариев нет:
Отправить комментарий