Skip to content

osmfile.py

Bases: object

OSM File output.

Parameters:

Name Type Description Default
filespec str

The input or output file

None
options dict

Command line options

None
outdir str

The output directory for the file

'/tmp/'

Returns:

Type Description
OsmFile

An instance of this object

Source code in osm_merge/osmfile.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def __init__(
    self,
    filespec: str = None,
    options: dict = None,
    outdir: str = "/tmp/",
):
    """This class reads and writes the OSM XML formated files.

    Args:
        filespec (str): The input or output file
        options (dict): Command line options
        outdir (str): The output directory for the file

    Returns:
        (OsmFile): An instance of this object
    """
    if options is None:
        options = dict()
    self.options = options
    # Read the config file to get our OSM credentials, if we have any
    # self.config = config.config(self.options)
    self.version = 3
    self.visible = "true"
    self.osmid = -1
    # Open the OSM output file
    self.file = None
    if filespec is not None:
        self.file = open(filespec, "w")
        # self.file = open(filespec + ".osm", 'w')
        logging.info("Opened output file: " + filespec)
    self.header()
    # logging.error("Couldn't open %s for writing!" % filespec)

    # This is the file that contains all the filtering data
    # self.ctable = convfile(self.options.get('convfile'))
    # self.options['convfile'] = None
    # These are for importing the CO addresses
    self.full = None
    self.addr = None
    # decrement the ID
    self.start = -1
    # path = xlsforms_path.replace("xlsforms", "")
    self.data = list()

isclosed

isclosed()

Is the OSM XML file open or closed ?

Returns:

Type Description
bool

The OSM XML file status

Source code in osm_merge/osmfile.py
85
86
87
88
89
90
91
def isclosed(self):
    """Is the OSM XML file open or closed ?

    Returns:
        (bool): The OSM XML file status
    """
    return self.file.closed

header

header()

Write the header of the OSM XML file.

Source code in osm_merge/osmfile.py
93
94
95
96
97
98
def header(self):
    """Write the header of the OSM XML file."""
    if self.file is not None:
        self.file.write("<?xml version='1.0' encoding='UTF-8'?>\n")
        self.file.write('<osm version="0.6" generator="osm-merge 0.3">\n')
        self.file.flush()

footer

footer()

Write the footer of the OSM XML file.

Source code in osm_merge/osmfile.py
100
101
102
103
104
105
106
107
108
def footer(self):
    """Write the footer of the OSM XML file."""
    # logging.debug("FIXME: %r" % self.file)
    if self.file is not None:
        self.file.write("</osm>\n")
        self.file.flush()
        if self.file is False:
            self.file.close()
    self.file = None

loadFile

loadFile(osmfile)

Read a OSM XML file and convert it to GeoJson for consistency.

Parameters:

Name Type Description Default
osmfile str

The OSM XML file to load

required

Returns:

Type Description
list

The entries in the OSM XML file

Source code in osm_merge/osmfile.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def loadFile(
    self,
    osmfile: str,
) -> list:
    """
    Read a OSM XML file and convert it to GeoJson for consistency.

    Args:
        osmfile (str): The OSM XML file to load

    Returns:
        (list): The entries in the OSM XML file
    """
    alldata = list()
    size = os.path.getsize(osmfile)
    with open(osmfile, "r") as file:
        xml = file.read(size)
        doc = xmltodict.parse(xml)
        if "osm" not in doc:
            logging.warning("No data in this instance")
            return False
        data = doc["osm"]
        if "node" not in data:
            logging.warning("No nodes in this instance")
            return False

    nodes = dict()
    for node in data["node"]:
        properties = {
            "id": int(node["@id"]),
        }
        if "@version" not in node:
            properties["version"] = 1
        else:
            properties["version"] = node["@version"]

        if "@timestamp" in node:
            properties["timestamp"] = node["@timestamp"]

        if "tag" in node:
            for tag in node["tag"]:
                if type(tag) == dict:
                    # Drop all the TIGER tags based on
                    # https://wiki.openstreetmap.org/wiki/TIGER_fixup
                    if tag["@k"] in properties:
                        if properties[tag["@k"]][:7] == "tiger:":
                            continue
                    properties[tag["@k"]] = tag["@v"].strip()
                    # continue
                else:
                    properties[node["tag"]["@k"]] = node["tag"]["@v"].strip()
                # continue
        if "@lon" in node and "@lat" in node:
            geom = Point((float(node["@lon"]), float(node["@lat"])))
        # cache the nodes so we can dereference the refs into
        # coordinates, but we don't need them in GeoJson format.
        nodes[properties["id"]] = geom
        if len(properties) > 2:
            self.data.append(Feature(geometry=geom, properties=properties))

    for way in data["way"]:
        attrs = dict()
        properties = {
            "id": int(way["@id"]),
        }
        refs = list()
        if "nd" in way:
            if len(way["nd"]) > 0:
                for ref in way["nd"]:
                    refs.append(int(ref["@ref"]))
            properties["refs"] = refs

        if "@version" not in node:
            properties["version"] = 1
        else:
            properties["version"] = node["@version"]

        if "@timestamp" in node:
            attrs["timestamp"] = node["@timestamp"]

        if "tag" in way:
            for tag in way["tag"]:
                if type(tag) == dict:
                    properties[tag["@k"]] = tag["@v"].strip()
                    # continue
                else:
                    properties[way["tag"]["@k"]] = way["tag"]["@v"].strip()
                # continue
        # geom =
        tmp = list()
        for ref in refs:
            if ref in nodes:
                # Only add nodes that have been cached
                tmp.append(nodes[ref]['coordinates'])
        geom = LineString(tmp)
        if geom is None:
            breakpoint()
        # log.debug(f"WAY: {properties}")
        self.data.append(Feature(geometry=geom, properties=properties))

    return self.data

writeOSM

writeOSM(data, filespec)

Write the data to an OSM XML file.

Parameters:

Name Type Description Default
data list

The list of GeoJson features

required
Source code in osm_merge/osmfile.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def writeOSM(self,
             data: list,
             filespec: str,
             ):
    """
    Write the data to an OSM XML file.

    Args:
        data (list): The list of GeoJson features
    """
    negid = -100
    id = -1
    out = str()
    newmvum = list()
    self.file = open(filespec, "w")
    self.header()
    for entry in data:
        version = 1
        tags = entry["properties"]
        if "osm_id" in tags:
            id = tags["osm_id"]
        elif "id" in tags:
            id = tags["id"]
        elif "id" not in tags:
            # There is no id or version for non OSM features
            id -= 1
        if "version" in entry["properties"]:
            version = int(entry["properties"]["version"])
            version += 1
        # if id == 814085818:
        #    breakpoint()
        attrs = {"id": id, "version": version}
        # These are OSM attributes, not tags
        if "id" in tags:
            del tags["id"]
        if "version" in tags:
            del tags["version"]
        item = {"attrs": attrs, "tags": tags}
        # if entry["geometry"]["type"] == "LineString" or entry["geometry"]["type"] == "Polygon":
        # print(entry)
        out = str()
        if entry["geometry"] is not None and entry["geometry"]["type"] == "Point":
            # It's a node referenced by a way
            item["attrs"]["lon"] = entry["geometry"]["coordinates"][0]
            item["attrs"]["lat"] = entry["geometry"]["coordinates"][1]
            if "timestamp" in item["tags"]:
                item["attrs"]["timestamp"] = item["tags"]["timestamp"]
                del item["tags"]["timestamp"]
            # referenced nodes should have no tags
            # del item["tags"]
            # FIXME: determine if we need to write nodes
            out = self.createNode(item, False)
        else:
            # OSM ways don't have a geometry, just references to node IDs.
            # The OSM XML file won't have any nodes, so at first won't
            # display in JOSM until you do a File->"Update modified",
            if "refs" not in tags:
                # log.debug(f"No Refs, so new MVUM road not in OSM {tags}")
                # tags["fixme"] = "New road from MVUM, don't add!"
                # FIXME: for now we don't do anything with new roads from
                # an external dataset, because that would be an import.
                newmvum.append(entry)
                continue
            if len(tags['refs']) > 0:
                if type(tags["refs"]) != list:
                    item["refs"] = eval(tags["refs"])
                else:
                    item["refs"] = tags["refs"]
                del tags["refs"]
                out = self.createWay(item, True)
        if len(out) > 0:
            self.file.write(out + "\n")
    self.footer()

createWay

createWay(way, modified=False)

This creates a string that is the OSM representation of a node.

Parameters:

Name Type Description Default
way dict

The input way data structure

required
modified bool

Is this a modified feature ?

False

Returns:

Type Description
str

The OSM XML entry

Source code in osm_merge/osmfile.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
def createWay(
    self,
    way: dict,
    modified: bool = False,
):
    """This creates a string that is the OSM representation of a node.

    Args:
        way (dict): The input way data structure
        modified (bool): Is this a modified feature ?

    Returns:
        (str): The OSM XML entry
    """
    attrs = dict()
    osm = ""

    # Add default attributes
    if modified:
        attrs["action"] = "modify"
    if "osm_way_id" in way["attrs"]:
        attrs["id"] = int(way["attrs"]["osm_way_id"])
    elif "osm_id" in way["attrs"]:
        attrs["id"] = int(way["attrs"]["osm_id"])
    elif "id" in way["attrs"]:
        attrs["id"] = int(way["attrs"]["id"])
    else:
        attrs["id"] = self.start
        self.start -= 1
    if "version" not in way["attrs"]:
        attrs["version"] = 1
    else:
        attrs["version"] = way["attrs"]["version"]
    attrs["timestamp"] = datetime.now().strftime("%Y-%m-%dT%TZ")
    # If the resulting file is publicly accessible without authentication, The GDPR applies
    # and the identifying fields should not be included
    if "uid" in way["attrs"]:
        attrs["uid"] = way["attrs"]["uid"]
    if "user" in way["attrs"]:
        attrs["user"] = way["attrs"]["user"]

    # Make all the nodes first. The data in the track has 4 fields. The first two
    # are the lat/lon, then the altitude, and finally the GPS accuracy.
    # newrefs = list()
    node = dict()
    node["attrs"] = dict()
    # The geometry is an EWKT string, so there is no need to get fancy with
    # geometries, just manipulate the string, as OSM XML it's only strings
    # anyway.
    # geom = way['geom'][19:][:-2]
    # print(geom)
    # points = geom.split(",")
    # print(points)

    # epdb.st()
    # loop = 0
    # while loop < len(way['refs']):
    #     #print(f"{points[loop]} {way['refs'][loop]}")
    #     node['timestamp'] = attrs['timestamp']
    #     if 'user' in attrs and attrs['user'] is not None:
    #         node['attrs']['user'] = attrs['user']
    #     if 'uid' in attrs and attrs['uid'] is not None:
    #         node['attrs']['uid'] = attrs['uid']
    #     node['version'] = 0
    #     lat,lon = points[loop].split(' ')
    #     node['attrs']['lat'] = lat
    #     node['attrs']['lon'] = lon
    #     node['attrs']['id'] = way['refs'][loop]
    #     osm += self.createNode(node) + '\n'
    #     loop += 1

    # Processs atrributes
    line = ""
    for ref, value in attrs.items():
        line += "%s=%r " % (ref, str(value))
    osm += "  <way " + line + ">"

    if "refs" in way:
        for ref in way["refs"]:
            osm += '\n    <nd ref="%s"/>' % ref
    if "tags" in way:
        for key, value in way["tags"].items():
            if value is None:
                continue
            if key == "track":
                continue
            if key not in attrs:
                newkey = html.escape(key)
                newval = html.escape(str(value))
                osm += f"\n    <tag k='{newkey}' v='{newval}'/>"
        if modified:
            osm += '\n    <tag k="note" v="Do not upload this without validation!"/>'
        osm += "\n"
    osm += "  </way>\n"

    return osm

featureToNode

featureToNode(feature)

Convert a GeoJson feature into the data structures used here.

Parameters:

Name Type Description Default
feature dict

The GeoJson feature to convert

required

Returns:

Type Description
dict

The data structure used by this file

Source code in osm_merge/osmfile.py
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def featureToNode(
    self,
    feature: dict,
):
    """Convert a GeoJson feature into the data structures used here.

    Args:
        feature (dict): The GeoJson feature to convert

    Returns:
        (dict): The data structure used by this file
    """
    osm = dict()
    ignore = ("label", "title")
    tags = dict()
    attrs = dict()
    for tag, value in feature["properties"].items():
        if tag == "id":
            attrs["osm_id"] = value
        elif tag not in ignore:
            tags[tag] = value
    coords = feature["geometry"]["coordinates"]
    attrs["lat"] = coords[1]
    attrs["lon"] = coords[0]
    osm["attrs"] = attrs
    osm["tags"] = tags
    return osm

createNode

createNode(node, modified=False)

This creates a string that is the OSM representation of a node.

Parameters:

Name Type Description Default
node dict

The input node data structure

required
modified bool

Is this a modified feature ?

False

Returns:

Type Description
str

The OSM XML entry

Source code in osm_merge/osmfile.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def createNode(
    self,
    node: dict,
    modified: bool = False,
):
    """This creates a string that is the OSM representation of a node.

    Args:
        node (dict): The input node data structure
        modified (bool): Is this a modified feature ?

    Returns:
        (str): The OSM XML entry
    """
    attrs = dict()
    # Add default attributes
    if modified:
        attrs["action"] = "modify"

    if "id" in node["attrs"]:
        attrs["id"] = int(node["attrs"]["id"])
    else:
        attrs["id"] = self.start
        self.start -= 1
    if "version" not in node["attrs"]:
        attrs["version"] = "1"
    else:
        attrs["version"] = int(node["attrs"]["version"]) + 1
    attrs["lat"] = node["attrs"]["lat"]
    attrs["lon"] = node["attrs"]["lon"]
    attrs["timestamp"] = datetime.now().strftime("%Y-%m-%dT%TZ")
    # If the resulting file is publicly accessible without authentication, THE GDPR applies
    # and the identifying fields should not be included
    if "uid" in node["attrs"]:
        attrs["uid"] = node["attrs"]["uid"]
    if "user" in node["attrs"]:
        attrs["user"] = node["attrs"]["user"]

    # Processs atrributes
    line = ""
    osm = ""
    for ref, value in attrs.items():
        line += "%s=%r " % (ref, str(value))
    osm += "  <node " + line

    if "tags" in node:
        osm += ">"
        for key, value in node["tags"].items():
            if not value:
                continue
            if key not in attrs:
                newkey = html.escape(key)
                newval = html.escape(str(value))
                osm += f"\n    <tag k='{newkey}' v='{newval}'/>"
        osm += "\n  </node>\n"
    else:
        osm += "/>"

    return osm

createTag

createTag(field, value)

Create a data structure for an OSM feature tag.

Parameters:

Name Type Description Default
field str

The tag name

required
value str

The value for the tag

required

Returns:

Type Description
dict

The newly created tag pair

Source code in osm_merge/osmfile.py
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
def createTag(
    self,
    field: str,
    value: str,
):
    """Create a data structure for an OSM feature tag.

    Args:
        field (str): The tag name
        value (str): The value for the tag

    Returns:
        (dict): The newly created tag pair
    """
    newval = str(value)
    newval = newval.replace("&", "and")
    newval = newval.replace('"', "")
    tag = dict()
    # logging.debug("OSM:makeTag(field=%r, value=%r)" % (field, newval))

    newtag = field
    change = newval.split("=")
    if len(change) > 1:
        newtag = change[0]
        newval = change[1]

    tag[newtag] = newval
    return tag

dump

dump()

Dump internal data structures, for debugging purposes only.

Source code in osm_merge/osmfile.py
500
501
502
503
504
505
506
507
508
def dump(self):
    """Dump internal data structures, for debugging purposes only."""
    for entry in self.data:
        if entry["geometry"]["type"] == 'Point':
            print("Node")
        else:
            print("Way")
        for key, value in entry["properties"].items():
            print(f"\t{key} = {value}")

getFeature

getFeature(id)

Get the data for a feature from the loaded OSM data file.

Parameters:

Name Type Description Default
id int

The ID to retrieve the feasture of

required

Returns:

Type Description
dict

The feature for this ID or None

Source code in osm_merge/osmfile.py
510
511
512
513
514
515
516
517
518
519
520
521
522
def getFeature(
    self,
    id: int,
):
    """Get the data for a feature from the loaded OSM data file.

    Args:
        id (int): The ID to retrieve the feasture of

    Returns:
        (dict): The feature for this ID or None
    """
    return self.data[id]

getFields

getFields()

Extract all the tags used in this file.

Source code in osm_merge/osmfile.py
524
525
526
527
528
529
530
531
def getFields(self):
    """Extract all the tags used in this file."""
    fields = list()
    for _id, item in self.data.items():
        keys = list(item["tags"].keys())
        for key in keys:
            if key not in fields:
                fields.append(key)

options: show_source: false heading_level: 3