Skip to content

osmfile.py

Bases: object

OSM File output.

Parameters:

Name Type Description Default
filespec str

The input or output file

None
options dict

Command line options

None
outdir str

The output directory for the file

'/tmp/'

Returns:

Type Description
OsmFile

An instance of this object

Source code in osm_merge/osmfile.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def __init__(
    self,
    filespec: str = None,
    options: dict = None,
    outdir: str = "/tmp/",
):
    """This class reads and writes the OSM XML formated files.

    Args:
        filespec (str): The input or output file
        options (dict): Command line options
        outdir (str): The output directory for the file

    Returns:
        (OsmFile): An instance of this object
    """
    if options is None:
        options = dict()
    self.options = options
    # Read the config file to get our OSM credentials, if we have any
    # self.config = config.config(self.options)
    self.version = 3
    self.visible = "true"
    self.osmid = -1
    # Open the OSM output file
    self.file = None
    if filespec is not None:
        self.file = open(filespec, "r")
        # self.file = open(filespec + ".osm", 'w')
        logging.info("Opened input file: " + filespec)
    # logging.error("Couldn't open %s for writing!" % filespec)

    # This is the file that contains all the filtering data
    # self.ctable = convfile(self.options.get('convfile'))
    # self.options['convfile'] = None
    # These are for importing the CO addresses
    self.full = None
    self.addr = None
    # decrement the ID
    # path = xlsforms_path.replace("xlsforms", "")
    self.data = list()

isclosed

isclosed()

Is the OSM XML file open or closed ?

Returns:

Type Description
bool

The OSM XML file status

Source code in osm_merge/osmfile.py
85
86
87
88
89
90
91
def isclosed(self):
    """Is the OSM XML file open or closed ?

    Returns:
        (bool): The OSM XML file status
    """
    return self.file.closed

header

header()

Write the header of the OSM XML file.

Source code in osm_merge/osmfile.py
93
94
95
96
97
98
def header(self):
    """Write the header of the OSM XML file."""
    if self.file is not None:
        self.file.write("<?xml version='1.0' encoding='UTF-8'?>\n")
        self.file.write('<osm version="0.6" generator="osm-merge 0.3">\n')
        self.file.flush()

footer

footer()

Write the footer of the OSM XML file.

Source code in osm_merge/osmfile.py
100
101
102
103
104
105
106
107
108
def footer(self):
    """Write the footer of the OSM XML file."""
    # logging.debug("FIXME: %r" % self.file)
    if self.file is not None:
        self.file.write("</osm>\n")
        self.file.flush()
        if self.file is False:
            self.file.close()
    self.file = None

loadFile

loadFile(osmfile)

Read a OSM XML file and convert it to GeoJson for consistency.

Parameters:

Name Type Description Default
osmfile str

The OSM XML file to load

required

Returns:

Type Description
list

The entries in the OSM XML file

Source code in osm_merge/osmfile.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def loadFile(
    self,
    osmfile: str,
) -> list:
    """
    Read a OSM XML file and convert it to GeoJson for consistency.

    Args:
        osmfile (str): The OSM XML file to load

    Returns:
        (list): The entries in the OSM XML file
    """
    alldata = list()
    size = os.path.getsize(osmfile)
    with open(osmfile, "r") as file:
        xml = file.read(size)
        doc = xmltodict.parse(xml)
        if "osm" not in doc:
            logging.warning("No data in this instance")
            return False
        data = doc["osm"]
        if "node" not in data:
            logging.warning("No nodes in this instance")
            return False

    nodes = dict()
    for node in data["node"]:
        properties = {
            "id": int(node["@id"]),
        }
        if "@version" not in node:
            properties["version"] = 1
        else:
            properties["version"] = node["@version"]

        if "@timestamp" in node:
            properties["timestamp"] = node["@timestamp"]

        if "tag" in node:
            for tag in node["tag"]:
                if type(tag) == dict:
                    # Drop all the TIGER tags based on
                    # https://wiki.openstreetmap.org/wiki/TIGER_fixup
                    if tag["@k"] in properties:
                        if properties[tag["@k"]][:7] == "tiger:":
                            continue
                    properties[tag["@k"]] = tag["@v"].strip()
                    # continue
                else:
                    properties[node["tag"]["@k"]] = node["tag"]["@v"].strip()
                # continue
        geom = Point((float(node["@lon"]), float(node["@lat"])))
        # cache the nodes so we can dereference the refs into
        # coordinates, but we don't need them in GeoJson format.
        nodes[properties["id"]] = geom
        if len(properties) > 2:
            self.data.append(Feature(geometry=geom, properties=properties))

    for way in data["way"]:
        attrs = dict()
        properties = {
            "id": int(way["@id"]),
        }
        refs = list()
        if "nd" in way:
            if len(way["nd"]) > 0:
                for ref in way["nd"]:
                    refs.append(int(ref["@ref"]))
            properties["refs"] = refs

        if "@version" not in node:
            properties["version"] = 1
        else:
            properties["version"] = node["@version"]

        if "@timestamp" in node:
            attrs["timestamp"] = node["@timestamp"]

        if "tag" in way:
            for tag in way["tag"]:
                if type(tag) == dict:
                    properties[tag["@k"]] = tag["@v"].strip()
                    # continue
                else:
                    properties[way["tag"]["@k"]] = way["tag"]["@v"].strip()
                # continue
        # geom =
        tmp = list()
        for ref in refs:
            if ref in nodes:
                # Only add nodes that have been cached
                tmp.append(nodes[ref]['coordinates'])
        geom = LineString(tmp)
        if geom is None:
            breakpoint()
        # log.debug(f"WAY: {properties}")
        self.data.append(Feature(geometry=geom, properties=properties))

    return self.data

writeOSM

writeOSM(data, filespec)

Write the data to an OSM XML file.

Parameters:

Name Type Description Default
data list

The list of GeoJson features

required
Source code in osm_merge/osmfile.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
def writeOSM(self,
             data: list,
             filespec: str,
             ):
    """
    Write the data to an OSM XML file.

    Args:
        data (list): The list of GeoJson features
    """
    negid = -100
    out = str()
    #newmvum = list()
    nodes = str()
    ways = str()
    self.file = open(filespec, "w")
    self.header()
    if type(data) == FeatureCollection:
        indata = data["features"]
    else:
        indata = data
    spin = Bar('Processing output file...', max=len(indata))
    for entry in indata:
        spin.next()
        version = 1
        tags = entry["properties"]
        if "ref" in entry["properties"]:
            # FIXME: from GeoJson file
            pass
        # elif "id" not in tags:
        #else:
            # There is no id or version for non OSM features
        #     self.osmid -= 1
        if "version" in entry["properties"]:
            version = int(entry["properties"]["version"])
            version += 1
        if "osm_id" in tags:
            id = tags["osm_id"]
        elif "id" in tags:
            id = tags["id"]
        else:
            id = self.osmid
        attrs = {"version": version}
        # These are OSM attributes, not tags
        if "id" in tags:
            del tags["id"]
        if "version" in tags:
            del tags["version"]
        item = {"attrs": attrs, "tags": tags}
        if "timestamp" in item["tags"]:
            item["attrs"]["timestamp"] = item["tags"]["timestamp"]
            del item["tags"]["timestamp"]
        # print(entry)
        # out = str()
        # GeoJson input files have a geometry.
        if entry["geometry"] is not None:
            refnodes, refs = self.geom_to_nodes(entry)
            nodes += refnodes
            item["refs"] = refs
            ways += self.createWay(item, True) + '\n'
        else:
            # OSM ways don't have a geometry, just references to node IDs.
            # The OSM XML file won't have any nodes, so at first won't
            # display in JOSM until you do a File->"Update modified",
            if "refs" not in tags:
                # log.debug(f"No Refs, so new MVUM road not in OSM {tags}")
                # tags["fixme"] = "New road from MVUM, don't add!"
                # FIXME: for now we don't do anything with new roads from
                # an external dataset, because that would be an import.
                # newmvum.append(entry)
                continue
            if len(tags['refs']) > 0 or type(entry) == 'Feature':
                if type(tags["refs"]) != list:
                    item["refs"] = eval(tags["refs"])
                else:
                    item["refs"] = tags["refs"]
                del tags["refs"]
                del tags["lat"]
                del tags["lon"]
                ways += self.createWay(item, True)
        if len(nodes) == 0 and len(ways) == 0:
            logging.error(f"")
            quit()
        # OSM prefers all the nodes are first in the file
    self.file.write(nodes + "\n")
    self.file.write(ways + "\n")
    self.footer()

createWay

createWay(way, modified=False)

This creates a string that is the OSM representation of a node.

Parameters:

Name Type Description Default
way dict

The input way data structure

required
modified bool

Is this a modified feature ?

False

Returns:

Type Description
str

The OSM XML entry

Source code in osm_merge/osmfile.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
def createWay(
    self,
    way: dict,
    modified: bool = False,
):
    """This creates a string that is the OSM representation of a node.

    Args:
        way (dict): The input way data structure
        modified (bool): Is this a modified feature ?

    Returns:
        (str): The OSM XML entry
    """
    attrs = dict()
    osm = ""

    # Add default attributes
    # breakpoint()
    if modified:
        attrs["action"] = "modify"
    if "osm_way_id" in way["attrs"]:
        attrs["id"] = int(way["attrs"]["osm_way_id"])
    elif "osm_id" in way["attrs"]:
        attrs["id"] = int(way["attrs"]["osm_id"])
    elif "id" in way["attrs"]:
        attrs["id"] = int(way["attrs"]["id"])
    else:
        attrs["id"] = self.osmid
        self.osmid -= 1
    if "version" not in way["attrs"]:
        attrs["version"] = 1
    else:
        attrs["version"] = way["attrs"]["version"]
    attrs["timestamp"] = datetime.now().strftime("%Y-%m-%dT%TZ")
    # If the resulting file is publicly accessible without authentication, The GDPR applies
    # and the identifying fields should not be included
    if "uid" in way["attrs"]:
        attrs["uid"] = way["attrs"]["uid"]
    if "user" in way["attrs"]:
        attrs["user"] = way["attrs"]["user"]

    # Make all the nodes first. The data in the track has 4 fields. The first two
    # are the lat/lon, then the altitude, and finally the GPS accuracy.
    # newrefs = list()
    node = dict()
    node["attrs"] = dict()
    # The geometry is an EWKT string, so there is no need to get fancy with
    # geometries, just manipulate the string, as OSM XML it's only strings
    # anyway.
    # geom = way['geom'][19:][:-2]
    # print(geom)
    # points = geom.split(",")
    # print(points)

    # epdb.st()
    # loop = 0
    # while loop < len(way['refs']):
    #     #print(f"{points[loop]} {way['refs'][loop]}")
    #     node['timestamp'] = attrs['timestamp']
    #     if 'user' in attrs and attrs['user'] is not None:
    #         node['attrs']['user'] = attrs['user']
    #     if 'uid' in attrs and attrs['uid'] is not None:
    #         node['attrs']['uid'] = attrs['uid']
    #     node['version'] = 0
    #     lat,lon = points[loop].split(' ')
    #     node['attrs']['lat'] = lat
    #     node['attrs']['lon'] = lon
    #     node['attrs']['id'] = way['refs'][loop]
    #     osm += self.createNode(node) + '\n'
    #     loop += 1

    # Processs atrributes
    line = ""
    for ref, value in attrs.items():
        line += "%s=%r " % (ref, str(value))
    osm += "  <way " + line + ">"

    if "refs" in way:
        for ref in way["refs"]:
            osm += '\n    <nd ref="%s"/>' % ref
    if "tags" in way:
        for key, value in way["tags"].items():
            if value is None:
                continue
            if key == "track":
                continue
            if key not in attrs:
                newkey = html.escape(key)
                newval = html.escape(str(value))
                osm += f"\n    <tag k='{newkey}' v='{newval}'/>"
        if modified:
            osm += '\n    <tag k="note" v="Do not upload this without validation!"/>'
        osm += "\n"
    osm += "  </way>\n"

    return osm

featureToNode

featureToNode(feature)

Convert a GeoJson feature into the data structures used here.

Parameters:

Name Type Description Default
feature dict

The GeoJson feature to convert

required

Returns:

Type Description
dict

The data structure used by this file

Source code in osm_merge/osmfile.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def featureToNode(
    self,
    feature: dict,
):
    """Convert a GeoJson feature into the data structures used here.

    Args:
        feature (dict): The GeoJson feature to convert

    Returns:
        (dict): The data structure used by this file
    """
    osm = dict()
    ignore = ("label", "title")
    tags = dict()
    attrs = dict()
    for tag, value in feature["properties"].items():
        if tag == "id":
            attrs["osm_id"] = value
        elif tag not in ignore:
            tags[tag] = value
    coords = feature["geometry"]["coordinates"]
    attrs["lat"] = coords[1]
    attrs["lon"] = coords[0]
    osm["attrs"] = attrs
    osm["tags"] = tags
    return osm

createNode

createNode(node, modified=False)

This creates a string that is the OSM representation of a node.

Parameters:

Name Type Description Default
node dict

The input node data structure

required
modified bool

Is this a modified feature ?

False

Returns:

Type Description
str

The OSM XML entry

Source code in osm_merge/osmfile.py
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def createNode(
    self,
    node: dict,
    modified: bool = False,
):
    """This creates a string that is the OSM representation of a node.

    Args:
        node (dict): The input node data structure
        modified (bool): Is this a modified feature ?

    Returns:
        (str): The OSM XML entry
    """
    attrs = dict()
    # Add default attributes
    if modified:
        attrs["action"] = "modify"

    if "id" in node["attrs"]:
        attrs["id"] = int(node["attrs"]["id"])
    else:
        attrs["id"] = self.osmid
        self.osmid -= 1
    if "version" not in node["attrs"]:
        attrs["version"] = "1"
    else:
        attrs["version"] = int(node["attrs"]["version"]) + 1
    attrs["lat"] = node["attrs"]["lat"]
    attrs["lon"] = node["attrs"]["lon"]
    attrs["timestamp"] = datetime.now().strftime("%Y-%m-%dT%TZ")
    # If the resulting file is publicly accessible without authentication, THE GDPR applies
    # and the identifying fields should not be included
    if "uid" in node["attrs"]:
        attrs["uid"] = node["attrs"]["uid"]
    if "user" in node["attrs"]:
        attrs["user"] = node["attrs"]["user"]

    # Processs atrributes
    line = ""
    osm = ""
    for ref, value in attrs.items():
        line += "%s=%r " % (ref, str(value))
    osm += "  <node " + line

    if "tags" in node:
        osm += ">"
        for key, value in node["tags"].items():
            if not value:
                continue
            if key not in attrs:
                newkey = html.escape(key)
                newval = html.escape(str(value))
                osm += f"\n    <tag k='{newkey}' v='{newval}'/>"
        osm += "\n  </node>\n"
    else:
        osm += "/>"

    return osm

createTag

createTag(field, value)

Create a data structure for an OSM feature tag.

Parameters:

Name Type Description Default
field str

The tag name

required
value str

The value for the tag

required

Returns:

Type Description
dict

The newly created tag pair

Source code in osm_merge/osmfile.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
def createTag(
    self,
    field: str,
    value: str,
):
    """Create a data structure for an OSM feature tag.

    Args:
        field (str): The tag name
        value (str): The value for the tag

    Returns:
        (dict): The newly created tag pair
    """
    newval = str(value)
    newval = newval.replace("&", "and")
    newval = newval.replace('"', "")
    tag = dict()
    # logging.debug("OSM:makeTag(field=%r, value=%r)" % (field, newval))

    newtag = field
    change = newval.split("=")
    if len(change) > 1:
        newtag = change[0]
        newval = change[1]

    tag[newtag] = newval
    return tag

dump

dump()

Dump internal data structures, for debugging purposes only.

Source code in osm_merge/osmfile.py
514
515
516
517
518
519
520
521
522
def dump(self):
    """Dump internal data structures, for debugging purposes only."""
    for entry in self.data:
        if entry["geometry"]["type"] == 'Point':
            print("Node")
        else:
            print("Way")
        for key, value in entry["properties"].items():
            print(f"\t{key} = {value}")

getFeature

getFeature(id)

Get the data for a feature from the loaded OSM data file.

Parameters:

Name Type Description Default
id int

The ID to retrieve the feasture of

required

Returns:

Type Description
dict

The feature for this ID or None

Source code in osm_merge/osmfile.py
524
525
526
527
528
529
530
531
532
533
534
535
536
def getFeature(
    self,
    id: int,
):
    """Get the data for a feature from the loaded OSM data file.

    Args:
        id (int): The ID to retrieve the feasture of

    Returns:
        (dict): The feature for this ID or None
    """
    return self.data[id]

getFields

getFields()

Extract all the tags used in this file.

Source code in osm_merge/osmfile.py
538
539
540
541
542
543
544
545
def getFields(self):
    """Extract all the tags used in this file."""
    fields = list()
    for _id, item in self.data.items():
        keys = list(item["tags"].keys())
        for key in keys:
            if key not in fields:
                fields.append(key)

geom_to_nodes

geom_to_nodes(feature)

Convert the geometry from a GeoJson Feature to OSM XML.

Parameters:

Name Type Description Default
feature Feature

The GeoJson feature to get the geometry from

required

Returns:

Name Type Description
out str

The OSM XML output for this Feature

refs list

The list of node references

Source code in osm_merge/osmfile.py
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
def geom_to_nodes(self,
                feature: Feature,
                ):
    """
    Convert the geometry from a GeoJson Feature to OSM XML.

    Args:
        feature (Feature):  The GeoJson feature to get the geometry from

    Returns:
        out (str): The OSM XML output for this Feature
        refs (list): The list of node references
    """
    gtype = feature["geometry"]["type"]
    # logging.debug(f"GEOM TYPE: {gtype}")
    out = str()
    refs = list()
    if gtype == "Point":
        out =  self.createNode(item, False) + '\n'
    elif gtype == "LineString":
        coords = feature["geometry"]["coordinates"]
        for node in coords:
            item = {"attrs": dict(), "tags": dict(), "refs": list()}
            refs.append(self.osmid)
            # item["attrs"]["id"] = self.osmid
            item["attrs"]["version"] = 1
            item["attrs"]["lon"] = node[0]
            item["attrs"]["lat"] = node[1]
            if "tags" in item:
                del item["tags"]
            out += self.createNode(item, False) + '\n'
            # self.osmid -= 1
    elif gtype == "MultiLineString":
        # breakpoint()
        for line in feature["geometry"]["coordinates"]:
            # coords = line["coordinates"]
            segment = LineString(line)
            result = self.geom_to_nodes(Feature(geometry=segment))
            refs += result[1]
            out += result[0]
    return out, refs

options: show_source: false heading_level: 3