22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402 | class StarField(object):
"""Class to contain image data and lists of clusters and stars"""
def __init__(self, fits_file):
self.fits_file = fits_file
self.fits_file_basename = os.path.basename(self.fits_file)
self.fits_file_dirname = os.path.dirname(self.fits_file)
self.read_file()
def has_wcs_header(self):
return "WCSAXES" in self.header
def read_file(self):
"""Read the fits_file, store the contents, and initialise data"""
logger.info(f"Reading file {self.fits_file}")
with fits.open(self.fits_file) as hdu_list:
# Convert the full image two a 2-D array with dtype int32
self.hdu_list = hdu_list
self.n_ij = hdu_list[0].data.astype(np.int32)
self.header = hdu_list[0].header
self.data_shape = self.n_ij.shape
self.b_ij = np.empty(self.data_shape, dtype=np.float64)
self.pixel_variance = np.empty(self.data_shape, dtype=np.float64)
self.source_mark = np.empty(self.data_shape, dtype=bool)
self.in_usable_region = np.ones(self.data_shape, dtype=bool)
self.sigma_background = None
self.mu_background = None
self.cluster_number = np.empty(self.data_shape, dtype=np.int32)
self.cluster_number.fill(-1) # Initially no pixels belong to a cluster
self.cluster_list = []
self.star_list = []
def threshold_hot_pixels(self, data, quantile=0.999):
data = data.copy()
maxval = np.quantile(data, quantile)
data[data > maxval] = maxval
return data
def setup_sky_coordinates(self, astronometry_api_key=None):
self.astronometry_api_key = astronometry_api_key
warnings.simplefilter("ignore") # suppress FITSFixedWarning
if "GEO_LAT" in self.header and "GEO_LONG" in self.header:
self.lat = self.header["GEO_LAT"]
self.lon = self.header["GEO_LONG"]
else:
# use location of RHUL observatory
self.lat = 51.42688295361113
self.lon = -0.5632737048924312
self.observation_date_str = self.header["DATE-OBS"]
observation_date = Time(self.observation_date_str, format="isot", scale="utc")
if self.has_wcs_header():
logger.info("Using WCS information from the fits file header")
wcs_header = self.header
elif astronometry_api_key is not None:
try:
logger.info("Attempting to extract WCS information from astrometry.net")
from astroquery.astrometry_net import AstrometryNet
ast = AstrometryNet()
ast.api_key = self.astronometry_api_key
wcs_header = ast.solve_from_image(self.fits_file, verbose=False)
if wcs_header:
logger.info("Succesfully extracted WCS information from astrometry.net")
logger.info("Updating fits file with WCS: overwriting")
self.header.update(wcs_header)
self.hdu_list.writeto(self.fits_file, overwrite=True)
else:
logger.info("Failed to extract WCS information from astrometry.net")
except Exception as e:
logger.info(f"Failed to obtain data from astronometry.net due to {e}")
wcs_header = {}
else:
logger.warning("Unable to obtain WCS information: please provide astronometry_api_key")
wcs_header = None
if wcs_header:
# Calculate the ra and dec
numCol = self.header["NAXIS1"]
numRow = self.header["NAXIS2"]
self.ra, self.dec = WCS(wcs_header).all_pix2world(
(numCol - 1) / 2.0, (numRow - 1) / 2.0, 1
)
self.ra = float(self.ra)
self.dec = float(self.dec)
# Calculate the alt and az
sky_coord = SkyCoord(
ra=self.ra * units.deg, dec=self.dec * units.deg, frame="icrs"
)
location = EarthLocation(lat=self.lat * units.deg, lon=self.lon * units.deg)
altaz_frame = AltAz(obstime=observation_date, location=location)
altaz = sky_coord.transform_to(altaz_frame)
self.alt = altaz.alt.deg
self.az = altaz.az.deg
print(f"Coordinate data for {self.fits_file}:")
print(f"Right Ascension (RA): {self.ra} degrees")
print(f"Declination (Dec): {self.dec} degrees")
print(f"Latitude: {self.lat} degrees")
print(f"Longitude: {self.lon} degrees")
print(f"Date and Time (UTC): {observation_date}")
print(f"Altitude: {self.alt} degrees")
print(f"Azimuth: {self.az} degrees")
warnings.resetwarnings()
def get_sky_projection(self):
return WCS(self.header)
def setup_sky_ax(self, ax):
ax.coords.grid(color='white', alpha=0.2, linestyle='dashed', linewidth=0.5)
lon = ax.coords[0]
lat = ax.coords[1]
lon.set_major_formatter('hh:mm:ss')
lon.set_ticks(spacing=1. * units.arcmin)
lon.set_ticklabel(exclude_overlapping=True)
lon.set_axislabel("RA")
lat.set_major_formatter('dd:mm')
lat.set_ticks(spacing=1. * units.arcmin)
lat.set_ticklabel(exclude_overlapping=True)
lat.set_axislabel("DEC")
return lat, lon
def set_usable_region(self, left=50, right=50, top=50, bottom=50, corner_radius=300):
"""Define a region in which to accept stars using a boolean array
This will create a 2D boolean array which omits a border and rounded corners
"""
bL, bR, bT, bB = left, right, top, bottom
numRows, numCols = self.n_ij.shape
rows, cols = np.ogrid[:numRows, :numCols] # meshgrid of pixel coord
TL_row, TL_col = bT + corner_radius, bL + corner_radius
BL_row, BL_col = numRows - bB - corner_radius, bL + corner_radius
BR_row, BR_col = numRows - bB - corner_radius, numCols - bR - corner_radius
TR_row, TR_col = bT + corner_radius, numCols - bR - corner_radius
in_TL = (
(rows >= bT)
& (rows < bT + corner_radius)
& (cols >= bL)
& (cols < bL + corner_radius)
)
in_BL = (
(rows >= numRows - bB - corner_radius)
& (rows < numRows - bB)
& (cols >= bL)
& (cols < bL + corner_radius)
)
in_BR = (
(rows >= numRows - bB - corner_radius)
& (rows < numRows - bB)
& (cols >= numCols - bR - corner_radius)
& (cols < numCols - bR)
)
in_TR = (
(rows >= bT)
& (rows < bT + corner_radius)
& (cols >= numCols - bR - corner_radius)
& (cols < numCols - bR)
)
TL_r = np.sqrt((rows - TL_row) ** 2 + (cols - TL_col) ** 2)
BL_r = np.sqrt((rows - BL_row) ** 2 + (cols - BL_col) ** 2)
BR_r = np.sqrt((rows - BR_row) ** 2 + (cols - BR_col) ** 2)
TR_r = np.sqrt((rows - TR_row) ** 2 + (cols - TR_col) ** 2)
inRectangle = (
(rows >= bT) & (rows < numRows - bB) & (cols >= bL) & (cols < numCols - bR)
)
inCorners = (
(in_TL & (TL_r > corner_radius))
| (in_BL & (BL_r > corner_radius))
| (in_BR & (BR_r > corner_radius))
| (in_TR & (TR_r > corner_radius))
)
self.in_usable_region = inRectangle & ~inCorners
def add_usable_region_to_ax(self, ax, color="dodgerblue", linestyle="dashed"):
ifr = self.in_usable_region
boundary = np.zeros_like(ifr, dtype=np.uint8)
boundary[ifr] = 1
ax.contour(boundary, levels=[0.5], colors=color, linestyles=linestyle)
return ax
def estimate_background(
self,
box=151,
hole=0,
background_algorithm="gauss",
use_existing_background=True,
background_dir=None,
):
"""Estimate the background and store it in a fits file"""
if background_dir is None:
background_dir = self.fits_file_dirname
hb = int((box - 1) / 2) # half of box-1
hh = int(max((hole - 1) / 2, 0)) # half of hole-1 or zero
background_file_name = "_".join(
[self.fits_file_basename, str(box), str(hole), "bkg.fit"]
)
background_file_path = os.path.join(background_dir, background_file_name)
if use_existing_background and os.path.exists(background_file_path):
logger.info(f"Background file {background_file_path} exists: using this")
with fits.open(background_file_path) as file:
self.b_ij = file[0].data
else:
numRows, numCols = self.b_ij.shape
delta = 10 # first calculate background on reduced grid
rowsGrid = np.around(numRows / delta).astype(int)
colsGrid = np.around(numCols / delta).astype(int)
bGrid = np.empty([rowsGrid, colsGrid])
for rowGrid in tqdm.tqdm(range(rowsGrid)):
for colGrid in range(colsGrid):
i = rowGrid * delta
j = colGrid * delta
xlo = max(j - hb, 0)
xhi = min(j + hb + 1, numCols)
ylo = max(i - hb, 0)
yhi = min(i + hb + 1, numRows)
fitRegion = self.n_ij[ylo:yhi, xlo:xhi]
if hole > 0:
mask = np.full((yhi - ylo, xhi - xlo), True, dtype=bool)
mask[hb - hh : hb + hh + 1, hb - hh : hb + hh + 1] = False
fitRegion = np.extract(mask, fitRegion) # exclude hole
if background_algorithm == "median":
bGrid[rowGrid, colGrid] = np.median(fitRegion)
elif background_algorithm == "gauss":
bGrid[rowGrid, colGrid], sigma = gaussian_background_fit(
fitRegion
)
else:
bGrid[rowGrid, colGrid] = 0
# 2D spline to interpolate background from grid to full image
grid_x = np.arange(0, numRows, delta)
grid_y = np.arange(0, numCols, delta)
spline_interpolator = RectBivariateSpline(grid_x, grid_y, bGrid)
image_x = np.arange(numRows)
image_y = np.arange(numCols)
self.b_ij = spline_interpolator(image_x, image_y)
# Store the background (using the fits_file as a basis)
self.hdu_list[0].data = self.b_ij
self.hdu_list.writeto(background_file_path, overwrite=True)
# Also initialise sigma2_n (same shape as image n) containing
# variance for individual pixel from CCD model.
nCorr = (
self.n_ij - self.b_ij + np.median(self.b_ij)
) # correct for flatness
self.mu_0, self.sigma_0 = gaussian_background_fit(nCorr[self.in_usable_region])
G = 2.39 # for SBIG ST-8XME camera
self.pixel_variance = (
np.maximum(self.n_ij - self.b_ij, 0) / G + self.sigma_0**2
)
def find_clusters(self, threshold_factor=2.5):
""" Cluster the image to identify isolated stars
Find pixels over threshold, then group pixels sharing a common edge
into clusters. When cluster started, create a stack. Pop pixels off
the stack, look at their neighbours, push onto stack if over threshold,
iterate until stack empty.
"""
cluster_number = -1
num_rows, num_cols = self.data_shape
def is_valid_pixel(i, j):
return 0 <= i < num_rows and 0 <= j < num_cols
threshold = self.b_ij + threshold_factor * self.sigma_0
for i in range(num_rows):
for j in range(num_cols):
start_of_cluster = (
self.n_ij[i, j] > threshold[i, j] and self.cluster_number[i, j] < 0
)
if start_of_cluster:
cluster_number += 1
self.cluster_number[i, j] = cluster_number
cluster = Cluster(cluster_number, (i, j), self)
stack = deque([(i, j)])
while stack:
k, l = stack.pop()
directions = [(k - 1, l), (k + 1, l), (k, l - 1), (k, l + 1)]
for u, v in directions:
if is_valid_pixel(u, v):
if (
self.cluster_number[u, v] < 0
and self.n_ij[u, v] > threshold[u, v]
):
stack.append((u, v))
self.cluster_number[u, v] = cluster_number
cluster.add_pixel((u, v))
cluster.set_cluster_number(cluster_number)
self.cluster_list.append(cluster)
logger.info(f"Image {self.fits_file_basename} has {cluster_number + 1} clusters")
def process_cluster_list(self, minimum_pixels=6, sigma_cl_min=2.5, sigma_cl_max=5, rho_cl_max=0.1):
""" Process the cluster list
Accept clusters as star candidates if numPix >= minPix and
cluster width/shape within cut limits.
"""
numRows, numCols = self.data_shape
star_number = -1
self.mux_list = []
self.muy_list = []
self.star_list = []
for cluster in self.cluster_list:
covariance = cluster.cov()
sigma_x = -1.0
sigma_y = -1.0
rho_xy = 0.0
if covariance[0, 0] > 0 and covariance[1, 1] > 0:
sigma_x = np.sqrt(covariance[0, 0])
sigma_y = np.sqrt(covariance[1, 1])
rho_xy = covariance[0, 1] / (sigma_x * sigma_y)
covariance_ok = (
sigma_x >= sigma_cl_min
and sigma_x <= sigma_cl_max
and sigma_y >= sigma_cl_min
and sigma_y <= sigma_cl_max
and abs(rho_xy) <= rho_cl_max
)
mux, muy = cluster.mu()
xy_ok = self.in_usable_region[int(muy), int(mux)]
accept_cluster = cluster.number_of_pixels() >= minimum_pixels and covariance_ok and xy_ok
if accept_cluster:
star_number += 1
star = cluster
star.set_cluster_number(star_number)
mux, muy = star.mu()
self.mux_list.append(mux)
self.mux_list.append(muy)
self.star_list.append(star)
# Reorder in decreasing flux
self.star_list.sort(key=lambda x: x.flux(), reverse=True)
star_number = 0
for star in self.star_list:
star.set_cluster_number(star_number)
star_number += 1
self.mux_list = [star.mu()[0] for star in self.star_list]
self.muy_list = [star.mu()[1] for star in self.star_list]
# Show summary table and plot of stars found
print("Stars for image", self.fits_file)
print(
"Number numPix Flux mu_x mu_y sigma_x " + "sigma_y rho_xy"
)
for star in self.star_list:
flux = star.flux()
number_of_pixels = star.number_of_pixels()
mux, muy = star.mu()
covariance = star.cov()
sigma_x = np.sqrt(covariance[0, 0])
sigma_y = np.sqrt(covariance[1, 1])
rho_xy = covariance[0, 1] / (sigma_x * sigma_y)
print(
"{:4d}".format(star.cluster_number),
"{:6d}".format(number_of_pixels),
"{:10.1f}".format(flux),
"{:8.2f}".format(mux),
"{:8.2f}".format(muy),
"{:8.3f}".format(sigma_x),
"{:8.3f}".format(sigma_y),
"{:8.3f}".format(rho_xy),
)
|