""" Convenience functions for loading point clouds in various formats """
import os
import numpy as np
import pdal
DEFAULT_COLUMN_INDICES = range(6)
DEFAULT_COLUMN_NAMES = ["X", "Y", "Z", "Red", "Green", "Blue"]
DEFAULT_HEADER = "X,Y,Z,Red,Green,Blue"
[docs]def read_data(filename, usecols=None, userows=None, nrows=None):
""" Loads a point cloud as numpy array
Parameters
----------
filename: str
Filename of text file containing point cloud
usecols: list
List of column indices (text file) or names (LAS file) to load
Default: First six columns, e.g., (x, y, z, r, g, b)
userows: list
List of rows to load. Overrides nrows argument
Default: All rows
nrows: int
Number of random rows to load. Ignored if userows is given.
Default: Not used.
Returns
-------
A data array of shape (nrows x ncols)
"""
if filename.endswith(".csv") or filename.endswith(".txt"):
if usecols is None:
usecols = DEFAULT_COLUMN_INDICES
data = read_txt(filename, usecols=usecols, userows=userows, nrows=nrows)
elif filename.endswith(".las") or filename.endswith(".laz"):
if usecols is None:
usecols = DEFAULT_COLUMN_NAMES
data = read_las(filename, usecols=usecols, userows=userows, nrows=nrows)
else:
raise ValueError(
"Unsupported format provided. Please provide a CSV file"
"(.txt or .csv) or LAS/LAZ file."
)
return data
[docs]def read_txt(filename, usecols=DEFAULT_COLUMN_INDICES, userows=None, nrows=None):
""" Loads a point cloud from text file as numpy array
Parameters
----------
filename: str
Filename of text file containing point cloud
usecols: list
List of column indices to load
Default: First six columns, e.g., (x, y, z, r, g, b)
userows: list
List of rows to load. Overrides nrows argument
Default: All rows
nrows: int
Number of random rows to load. Ignored if userows is given.
Default: Not used.
Returns
-------
A data array of shape (nrows x ncols)
"""
if userows is None:
with open(filename, "r") as f:
for i, s in enumerate(f):
pass
nlines = i + 1
if nrows is not None:
nrows = int(nrows)
userows = np.random.choice(nlines, size=nrows)
else:
userows = range(nlines)
data = []
with open(filename, "r") as f:
for i, s in enumerate(f):
if i in userows:
row = s.split(",")
row = np.asarray(row)
row = row[usecols]
data.append(row)
return np.array(data)
[docs]def read_las(filename, usecols=DEFAULT_COLUMN_NAMES, userows=None, nrows=None):
"""Loads a point cloud from a LAS or LAZ file into a Numpy array
Theoretically, any file with a PDAL reader can be read with read_las
Parameters
----------
filename: str
Filename of LAS or LAZ file containing point cloud
usecols: list
List of column names to load
Default: ['X', 'Y', 'Z', 'Red', 'Green', 'Blue']
userows: list
List of rows to load. Overrides nrows argument
Default: All rows
nrows: int
Number of random rows to load. Ignored if userows is given.
Default: Not used.
Returns
-------
A data array of shape (nrows x ncols)
"""
json = '{"pipeline": ["' + filename + '"]}'
pipeline = pdal.Pipeline(json)
pipeline.validate()
pipeline.loglevel = 0
_ = pipeline.execute()
out = pipeline.arrays[0]
if userows is None:
if nrows is None:
data = np.hstack([out[key].reshape(-1, 1) for key in usecols])
return data
nrows = int(nrows)
userows = np.random.choice(out.shape[0], size=nrows)
data = []
for i in userows:
point = []
for key in usecols:
point.append(out[key][i])
data.append(point)
ncols = len(usecols)
data = np.array(data).reshape(nrows, ncols)
return data
[docs]def write_dem(data, filename, resolution=1, radius=None):
if radius is None:
radius = resolution * np.sqrt(2)
write_las(data, "temp.las")
json = (
'{"pipeline": [{"type": "readers.las", "filename": "temp.las"}, {"type": "writers.gdal", "resolution": "'
+ str(resolution)
+ '", "radius": "'
+ str(radius)
+ '", "filename": "'
+ filename
+ '"}]}'
)
pipeline = pdal.Pipeline(json)
pipeline.validate()
pipeline.loglevel = 0
_ = pipeline.execute()
os.remove("temp.las")
[docs]def write_las(arr, filename):
write_pdal(arr, filename, writer="writers.las")
[docs]def write_pdal(arr, filename, writer, header=DEFAULT_HEADER):
np.savetxt("temp.csv", arr, header=header, delimiter=",", comments="")
json = (
'{"pipeline": [{"type": "readers.text", "filename": "temp.csv"}, {"type": "'
+ writer
+ '", "filename": "'
+ filename
+ '"}]}'
)
pipeline = pdal.Pipeline(json)
pipeline.validate()
pipeline.loglevel = 0
_ = pipeline.execute()
os.remove("temp.csv")