gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[robocop] branch master updated: add script to convert XML input into sa


From: Admin
Subject: [robocop] branch master updated: add script to convert XML input into saner JSON
Date: Sat, 07 Jun 2025 23:30:38 +0200

This is an automated email from the git hooks/post-receive script.

grothoff pushed a commit to branch master
in repository robocop.

The following commit(s) were added to refs/heads/master by this push:
     new fb27b3e  add script to convert XML input into saner JSON
fb27b3e is described below

commit fb27b3edb21d058307ce9031670056ac29e6e698
Author: Christian Grothoff <christian@grothoff.org>
AuthorDate: Sat Jun 7 23:30:28 2025 +0200

    add script to convert XML input into saner JSON
---
 robocop-ch-to-json       | 1003 ++++++++++++++++++++++++++++++++++++++++++++++
 robocop-json-postprocess |    4 +
 2 files changed, 1007 insertions(+)

diff --git a/robocop-ch-to-json b/robocop-ch-to-json
new file mode 100755
index 0000000..ce9e3e8
--- /dev/null
+++ b/robocop-ch-to-json
@@ -0,0 +1,1003 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# robocop-ch-to-json
+#
+# Copyright (C) 2025 Taler Systems SA
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+"""
+Swiss Sanctions XML to JSON Converter
+
+This program converts Swiss sanctions XML files (following the 
swiss-sanctions-list XSD)
+to JSON format, mapping XML elements to registry identifiers and inlining all 
references.
+
+Features:
+- Converts targets (individuals, entities, objects) to self-contained JSON 
records
+- Maps XML elements to registry identifiers from GANA registry
+- Inlines place references (location, area, country)
+- Flattens name parts and includes spelling variants
+- Handles multiple identities and addresses
+- Preserves all identification documents and relationships
+
+Usage:
+    robocop-ch-to-json < input.xml > output.json
+"""
+
+import xml.etree.ElementTree as ET
+import json
+import sys
+from datetime import datetime
+from typing import Dict, List, Any, Optional
+import argparse
+import re
+
+class SwissSanctionsConverter:
+    """Converts Swiss sanctions XML to JSON format with registry mapping."""
+
+    def __init__(self):
+        # Registry mapping from XML elements to standardized identifiers
+        self.registry_mapping = {
+            # Personal information
+            'given-name': 'PERSON_FIRST_NAMES',
+            'further-given-name': 'PERSON_FIRST_NAMES',
+            'family-name': 'PERSON_LAST_NAME',
+            'maiden-name': 'PERSON_LAST_NAME',
+            'whole-name': 'FULL_NAME',
+            'day-month-year': 'DATE_OF_BIRTH',
+            'nationality': 'NATIONALITY',
+            'identification-document': 'PERSON_NATIONAL_ID',
+
+            # Business information
+            'entity_name': 'COMPANY_NAME',
+            'business_name': 'BUSINESS_DISPLAY_NAME',
+
+            # Address information
+            'address-details': 'ADDRESS_LINES',
+            'zip-code': 'ADDRESS_ZIPCODE',
+            'c-o': 'ADDRESS_LINES',
+            'p-o-box': 'ADDRESS_LINES',
+
+            # Contact information
+            'contact-person': 'CONTACT_PERSON_NAME',
+        }
+
+    def parse_xml(self, xml_file: str) -> ET.Element:
+        """Parse the XML file and return the root element."""
+        try:
+            if xml_file:
+                tree = ET.parse(xml_file)
+            else:
+                tree = ET.parse(sys.stdin)
+            return tree.getroot()
+        except ET.ParseError as e:
+            raise ValueError(f"Invalid XML file: {e}")
+        except FileNotFoundError:
+            raise FileNotFoundError(f"XML file not found: {xml_file}")
+
+    def _parse_element(self, element: ET.Element) -> Dict[str, Any]:
+        """Recursively parse XML element to dictionary."""
+        result = {}
+
+        # Add attributes
+        if element.attrib:
+            result.update(element.attrib)
+            # Convert numeric attributes to integers where appropriate
+            for key, value in element.attrib.items():
+                if key in ['ssid', 'day', 'month', 'year', 'place-id', 
'target-id', 'order']:
+                    try:
+                        result[key] = int(value)
+                    except ValueError:
+                        pass  # Keep as string if conversion fails
+                elif key in ['main', 'current']:
+                    result[key] = value.lower() == 'true'
+
+        # Handle text content
+        if element.text and element.text.strip():
+            if len(element) == 0:  # Leaf node with text only
+                return element.text.strip()
+            else:  # Mixed content
+                result['_text'] = element.text.strip()
+
+        # Process child elements
+        children_by_tag = {}
+        for child in element:
+            tag = child.tag
+            child_data = self._parse_element(child)
+
+            if tag not in children_by_tag:
+                children_by_tag[tag] = []
+            children_by_tag[tag].append(child_data)
+
+        # Add children to result
+        for tag, children in children_by_tag.items():
+            if len(children) == 1:
+                result[tag] = children[0]
+            else:
+                result[tag] = children
+
+        return result
+
+    def build_place_lookup(self, root: ET.Element) -> Dict[str, Dict[str, 
Any]]:
+        """Build a lookup dictionary for place references."""
+        places = {}
+
+        for place_elem in root.findall('place'):
+            ssid = place_elem.get('ssid')
+            if ssid:
+                place_data = {
+                    'location': None,
+                    'location_variants': [],
+                    'area': None,
+                    'area_variants': [],
+                    'country': None,
+                    'country_code': None
+                }
+
+                # Extract location
+                location_elem = place_elem.find('location')
+                if location_elem is not None and location_elem.text:
+                    place_data['location'] = location_elem.text.strip()
+
+                # Extract location variants
+                for variant in place_elem.findall('location-variant'):
+                    if variant.text:
+                        place_data['location_variants'].append({
+                            'value': variant.text.strip(),
+                            'type': variant.get('variant-type', 'unknown')
+                        })
+
+                # Extract area
+                area_elem = place_elem.find('area')
+                if area_elem is not None and area_elem.text:
+                    place_data['area'] = area_elem.text.strip()
+
+                # Extract area variants
+                for variant in place_elem.findall('area-variant'):
+                    if variant.text:
+                        place_data['area_variants'].append({
+                            'value': variant.text.strip(),
+                            'type': variant.get('variant-type', 'unknown')
+                        })
+
+                # Extract country
+                country_elem = place_elem.find('country')
+                if country_elem is not None:
+                    place_data['country'] = country_elem.text.strip() if 
country_elem.text else None
+                    place_data['country_code'] = country_elem.get('iso-code')
+
+                places[ssid] = place_data
+
+        return places
+
+    def resolve_place(self, place_id: str, places_lookup: Dict[str, Dict]) -> 
Dict[str, List[str]]:
+        """Resolve a place reference and return flattened address 
components."""
+        if place_id not in places_lookup:
+            return {}
+
+        place = places_lookup[place_id]
+        result = {}
+
+        # Add country information
+        if place['country_code']:
+            result['ADDRESS_COUNTRY'] = [place['country_code']]
+
+        # Add location (town/city)
+        locations = []
+        if place['location']:
+            locations.append(place['location'])
+        for variant in place['location_variants']:
+            locations.append(variant['value'])
+        if locations:
+            result['ADDRESS_TOWN_LOCATION'] = locations
+
+        # Add area (district/subdivision)
+        areas = []
+        if place['area']:
+            areas.append(place['area'])
+        for variant in place['area_variants']:
+            areas.append(variant['value'])
+        if areas:
+            result['ADDRESS_COUNTRY_SUBDIVISION'] = areas
+
+        return result
+
+    def extract_names(self, identity_elem: ET.Element) -> Dict[str, List[str]]:
+        """Extract and flatten name information from an identity element."""
+        result = {
+            'PERSON_FIRST_NAMES': [],
+            'PERSON_LAST_NAME': [],
+            'FULL_NAME': []
+        }
+
+        for name_elem in identity_elem.findall('name'):
+            # Process name parts
+            name_parts = []
+            first_names = []
+            last_names = []
+
+            for name_part in name_elem.findall('name-part'):
+                part_type = name_part.get('name-part-type', '')
+                value_elem = name_part.find('value')
+
+                if value_elem is not None and value_elem.text:
+                    value = value_elem.text.strip()
+                    name_parts.append(value)
+
+                    # Categorize name parts
+                    if part_type in ['given-name', 'further-given-name']:
+                        first_names.append(value)
+                    elif part_type in ['family-name', 'maiden-name']:
+                        last_names.append(value)
+                    elif part_type == 'whole-name':
+                        result['FULL_NAME'].append(value)
+
+                    # Add spelling variants
+                    for variant in name_part.findall('spelling-variant'):
+                        if variant.text:
+                            variant_value = variant.text.strip()
+                            if part_type in ['given-name', 
'further-given-name']:
+                                first_names.append(variant_value)
+                            elif part_type in ['family-name', 'maiden-name']:
+                                last_names.append(variant_value)
+                            elif part_type == 'whole-name':
+                                result['FULL_NAME'].append(variant_value)
+
+            # Add categorized names
+            result['PERSON_FIRST_NAMES'].extend(first_names)
+            result['PERSON_LAST_NAME'].extend(last_names)
+
+            # If we have separate parts but no whole name, combine them
+            if name_parts and not any(part.get('name-part-type') == 
'whole-name'
+                                    for part in 
name_elem.findall('name-part')):
+                full_name = ' '.join(name_parts)
+                result['FULL_NAME'].append(full_name)
+
+        # Remove duplicates while preserving order
+        for key in result:
+            seen = set()
+            result[key] = [x for x in result[key] if not (x in seen or 
seen.add(x))]
+
+        return result
+
+    def extract_birth_info(self, identity_elem: ET.Element) -> Dict[str, 
List[str]]:
+        """Extract birth date and nationality information."""
+        result = {}
+
+        # Extract birth dates
+        birth_dates = []
+        for dmy_elem in identity_elem.findall('day-month-year'):
+            day = dmy_elem.get('day')
+            month = dmy_elem.get('month')
+            year = dmy_elem.get('year')
+
+            date_parts = []
+            if year:
+                date_parts.append(year)
+            if month:
+                date_parts.append(f"{int(month):02d}")
+            if day:
+                date_parts.append(f"{int(day):02d}")
+
+            if date_parts:
+                # Format as ISO date if complete, otherwise partial
+                if len(date_parts) == 3:
+                    
birth_dates.append(f"{date_parts[0]}-{date_parts[1]}-{date_parts[2]}")
+                else:
+                    birth_dates.append('-'.join(date_parts))
+
+        if birth_dates:
+            result['DATE_OF_BIRTH'] = birth_dates
+
+        # Extract nationalities
+        nationalities = []
+        for nat_elem in identity_elem.findall('nationality'):
+            country_elem = nat_elem.find('country')
+            if country_elem is not None:
+                country_code = country_elem.get('iso-code')
+                if country_code:
+                    nationalities.append(country_code)
+
+        if nationalities:
+            result['NATIONALITY'] = nationalities
+
+        return result
+
+    def extract_addresses(self, identity_elem: ET.Element, places_lookup: 
Dict[str, Dict]) -> Dict[str, List[str]]:
+        """Extract address information from identity element."""
+        result = {}
+
+        for addr_elem in identity_elem.findall('address'):
+            place_id = addr_elem.get('place-id')
+
+            # Resolve place reference
+            if place_id:
+                place_info = self.resolve_place(place_id, places_lookup)
+                for key, values in place_info.items():
+                    if key not in result:
+                        result[key] = []
+                    result[key].extend(values)
+
+            # Extract address details
+            details_elem = addr_elem.find('address-details')
+            if details_elem is not None and details_elem.text:
+                if 'ADDRESS_LINES' not in result:
+                    result['ADDRESS_LINES'] = []
+                result['ADDRESS_LINES'].append(details_elem.text.strip())
+
+            # Extract zip code
+            zip_elem = addr_elem.find('zip-code')
+            if zip_elem is not None and zip_elem.text:
+                if 'ADDRESS_ZIPCODE' not in result:
+                    result['ADDRESS_ZIPCODE'] = []
+                result['ADDRESS_ZIPCODE'].append(zip_elem.text.strip())
+
+            # Extract c/o
+            co_elem = addr_elem.find('c-o')
+            if co_elem is not None and co_elem.text:
+                if 'ADDRESS_LINES' not in result:
+                    result['ADDRESS_LINES'] = []
+                result['ADDRESS_LINES'].append(f"c/o {co_elem.text.strip()}")
+
+            # Extract P.O. Box
+            po_elem = addr_elem.find('p-o-box')
+            if po_elem is not None and po_elem.text:
+                if 'ADDRESS_LINES' not in result:
+                    result['ADDRESS_LINES'] = []
+                result['ADDRESS_LINES'].append(f"P.O. Box 
{po_elem.text.strip()}")
+
+        return result
+
+    def extract_identification_documents(self, identity_elem: ET.Element, 
places_lookup: Dict[str, Dict]) -> Dict[str, List[str]]:
+        """Extract identification document information."""
+        result = {}
+
+        for doc_elem in identity_elem.findall('identification-document'):
+            doc_type = doc_elem.get('document-type', 'unknown')
+
+            # Extract document number
+            number_elem = doc_elem.find('number')
+            if number_elem is not None and number_elem.text:
+                doc_info = f"{doc_type}: {number_elem.text.strip()}"
+
+                # Add issuer information
+                issuer_elem = doc_elem.find('issuer')
+                if issuer_elem is not None:
+                    issuer_code = issuer_elem.get('code')
+                    if issuer_code:
+                        doc_info += f" (issued by {issuer_code})"
+
+                # Add dates if available
+                issue_date = doc_elem.find('date-of-issue')
+                expiry_date = doc_elem.find('expiry-date')
+                if issue_date is not None and issue_date.text:
+                    doc_info += f" issued: {issue_date.text}"
+                if expiry_date is not None and expiry_date.text:
+                    doc_info += f" expires: {expiry_date.text}"
+
+                if 'PERSON_NATIONAL_ID' not in result:
+                    result['PERSON_NATIONAL_ID'] = []
+                result['PERSON_NATIONAL_ID'].append(doc_info)
+
+        return result
+
+    def process_individual(self, individual_elem: ET.Element, places_lookup: 
Dict[str, Dict]) -> Dict[str, List[str]]:
+        """Process an individual target and extract all relevant 
information."""
+        result = {}
+
+        # Process all identities
+        for identity_elem in individual_elem.findall('identity'):
+            # Extract names
+            names = self.extract_names(identity_elem)
+            for key, values in names.items():
+                if key not in result:
+                    result[key] = []
+                result[key].extend(values)
+
+            # Extract birth information
+            birth_info = self.extract_birth_info(identity_elem)
+            for key, values in birth_info.items():
+                if key not in result:
+                    result[key] = []
+                result[key].extend(values)
+
+            # Extract addresses
+            addresses = self.extract_addresses(identity_elem, places_lookup)
+            for key, values in addresses.items():
+                if key not in result:
+                    result[key] = []
+                result[key].extend(values)
+
+            # Extract identification documents
+            id_docs = self.extract_identification_documents(identity_elem, 
places_lookup)
+            for key, values in id_docs.items():
+                if key not in result:
+                    result[key] = []
+                result[key].extend(values)
+
+        # Remove duplicates
+        for key in result:
+            seen = set()
+            result[key] = [x for x in result[key] if not (x in seen or 
seen.add(x))]
+
+        return result
+
+    def process_entity(self, entity_elem: ET.Element, places_lookup: Dict[str, 
Dict]) -> Dict[str, List[str]]:
+        """Process an entity target and extract all relevant information."""
+        result = {}
+
+        # Process all identities
+        for identity_elem in entity_elem.findall('identity'):
+            # Extract entity names
+            names = self.extract_names(identity_elem)
+            # Map entity names to business identifiers
+            if names.get('FULL_NAME'):
+                result['COMPANY_NAME'] = names['FULL_NAME']
+                result['BUSINESS_DISPLAY_NAME'] = names['FULL_NAME'].copy()
+
+            # Extract addresses (registered office)
+            addresses = self.extract_addresses(identity_elem, places_lookup)
+            # Map to registered office address for entities
+            for key, values in addresses.items():
+                if 'OFFICE' not in key:
+                    new_key = key.replace('ADDRESS_', 
'REGISTERED_OFFICE_ADDRESS_')
+                else:
+                    new_key = key
+                if new_key not in result:
+                    result[new_key] = []
+                result[new_key].extend(values)
+
+        # Remove duplicates
+        for key in result:
+            if isinstance(result[key], list):
+                seen = set()
+                result[key] = [x for x in result[key] if not (x in seen or 
seen.add(x))]
+
+        return result
+
+    def process_object(self, object_elem: ET.Element, places_lookup: Dict[str, 
Dict]) -> Dict[str, List[str]]:
+        """Process an object target and extract all relevant information."""
+        result = {}
+        object_type = object_elem.get('object-type', 'unknown')
+
+        # Process all identities
+        for identity_elem in object_elem.findall('identity'):
+            # Extract object names
+            names = self.extract_names(identity_elem)
+            if names.get('FULL_NAME'):
+                # Use a generic name field for objects
+                result['FULL_NAME'] = names['FULL_NAME']
+                # Add object type information
+                object_names = [f"{name} ({object_type})" for name in 
names['FULL_NAME']]
+                result['BUSINESS_DISPLAY_NAME'] = object_names
+
+        # Add object type as additional information
+        if 'FULL_NAME' not in result:
+            result['FULL_NAME'] = [f"Unknown {object_type}"]
+
+        return result
+
+    def _is_target_active(self, target: Dict[str, Any]) -> bool:
+        """Check if a target is active (most recent modification is not 
'de-listed')."""
+
+        if 'modification' not in target:
+            return True  # No modifications, consider active
+
+        modifications = target['modification']
+        if not isinstance(modifications, list):
+            modifications = [modifications]
+
+        # Find the most recent modification by effective-date, then by 
enactment-date
+        most_recent = None
+        most_recent_date = None
+
+        for mod in modifications:
+            mod_type = mod.get('modification-type', '')
+
+            # Determine the date to use for comparison
+            date_str = None
+            if 'effective-date' in mod:
+                date_str = mod['effective-date']
+            elif 'enactment-date' in mod:
+                date_str = mod['enactment-date']
+            elif 'publication-date' in mod:
+                date_str = mod['publication-date']
+
+            if date_str:
+                try:
+                    mod_date = datetime.strptime(date_str, '%Y-%m-%d')
+                    if most_recent_date is None or mod_date > most_recent_date:
+                        most_recent_date = mod_date
+                        most_recent = mod
+                except ValueError:
+                    continue  # Skip invalid dates
+            elif most_recent is None:
+                # If no dates available, use the last modification in the list
+                most_recent = mod
+
+        if most_recent is None:
+            return True  # No valid modification found, consider active
+
+        return most_recent.get('modification-type') != 'de-listed'
+
+    def process_target(self, target_elem: ET.Element, places_lookup: Dict[str, 
Dict]) -> Optional[Dict[str, Any]]:
+        """Process a single target element and return JSON representation."""
+        ssid = target_elem.get('ssid')
+        if not ssid:
+            return None
+
+        # Base target information
+        target_data = {
+            'ssid': ssid,
+            'sanctions_set_ids': [],
+            'foreign_identifier': None,
+            'target_type': None,
+            'justification': [],
+            'relations': [],
+            'other_information': [],
+            'PERSON_NATIONAL_ID': [],
+            'DATE_OF_BIRTH': [],
+            'CONTACT_EMAIL': [],
+            'CONTACT_PHONE': [],
+            'COMMERCIAL_REGISTER_NUMBER': [],
+            'FOUNDING_DATE': [],
+            'generic_attributes': {}
+        }
+
+        # Extract sanctions set IDs
+        for ss_id_elem in target_elem.findall('sanctions-set-id'):
+            if ss_id_elem.text:
+                
target_data['sanctions_set_ids'].append(ss_id_elem.text.strip())
+
+        # Extract foreign identifier
+        foreign_id_elem = target_elem.find('foreign-identifier')
+        if foreign_id_elem is not None and foreign_id_elem.text:
+            target_data['foreign_identifier'] = foreign_id_elem.text.strip()
+
+        # Process target type and extract specific information
+        registry_data = {}
+
+        individual_elem = target_elem.find('individual')
+        entity_elem = target_elem.find('entity')
+        object_elem = target_elem.find('object')
+
+        if individual_elem is not None:
+            target_data['target_type'] = 'individual'
+            target_data['sex'] = individual_elem.get('sex')
+            registry_data = self.process_individual(individual_elem, 
places_lookup)
+
+            # Extract justifications
+            for just_elem in individual_elem.findall('justification'):
+                if just_elem.text:
+                    target_data['justification'].append(just_elem.text.strip())
+
+            # Extract relations
+            for rel_elem in individual_elem.findall('relation'):
+                relation_info = {
+                    'target_id': rel_elem.get('target-id'),
+                    'relation_type': rel_elem.get('relation-type'),
+                    'remark': None
+                }
+                remark_elem = rel_elem.find('remark')
+                if remark_elem is not None and remark_elem.text:
+                    relation_info['remark'] = remark_elem.text.strip()
+                target_data['relations'].append(relation_info)
+
+            # Extract other information
+            for other_elem in individual_elem.findall('other-information'):
+                if other_elem.text:
+                    # "other-information" is very messy. We try our best to 
match
+                    # it against various regular expressions and extract bits.
+                    oi = other_elem.text.strip()
+                    found = False;
+                    match = re.search(r'Passport Number:\s*([A-Za-z0-9]+)', 
oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['PERSON_NATIONAL_ID'].append(pnum)
+                        found = True
+                    match = re.search(r'([A-Za-z])*\s*national 
number:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['PERSON_NATIONAL_ID'].append(pnum)
+                        found = True
+                    match = re.search(r'Personal ID:\s*([A-Za-z0-9]+)', oi, 
re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['PERSON_NATIONAL_ID'].append(pnum)
+                        found = True
+                    match = re.search(r'National ID:\s*([A-Za-z0-9]+)', oi, 
re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['PERSON_NATIONAL_ID'].append(pnum)
+                        found = True
+                    match = re.search(r'National ID\.:\s*([A-Za-z0-9]+)', oi, 
re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['PERSON_NATIONAL_ID'].append(pnum)
+                        found = True
+                    match = re.search(r'National identification 
number:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['PERSON_NATIONAL_ID'].append(pnum)
+                        found = True
+                    match = re.search(r'National identification 
no:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['PERSON_NATIONAL_ID'].append(pnum)
+                        found = True
+                    match = re.search(r'Personal 
identification:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['PERSON_NATIONAL_ID'].append(pnum)
+                        found = True
+                    match = 
re.search(r'Passport:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['PERSON_NATIONAL_ID'].append(pnum)
+                        found = True
+                    match = 
re.search(r'Passport\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['PERSON_NATIONAL_ID'].append(pnum)
+                        found = True
+                    match = re.search(r'ID Card 
Number:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['PERSON_NATIONAL_ID'].append(pnum)
+                        found = True
+                    match = re.search(r'Passport or ID 
number:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['PERSON_NATIONAL_ID'].append(pnum)
+                        found = True
+                    match = re.search(r'National 
ID:\s*([A-Za-z0-9]+)\s*;\s*Passport:\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE)
+                    nnum = match.group(1) if match else None
+                    if nnum is not None:
+                        target_data['PERSON_NATIONAL_ID'].append(nnum)
+                        found = True
+                    pnum = match.group(2) if match else None
+                    if pnum is not None:
+                        target_data['PERSON_NATIONAL_ID'].append(pnum)
+                        found = True
+                    match = re.search(r'State Identification 
Number\s*([A-Za-z()]*)\s*:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE)
+                    pnum = match.group(2) if match else None
+                    if pnum is not None:
+                        target_data['PERSON_NATIONAL_ID'].append(pnum)
+                        found = True
+                    match = re.search(r'e-mail:\s*([A-Za-z0-9@]+)', oi, 
re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['CONTACT_EMAIL'].append(pnum)
+                        found = True
+                    match = re.search(r'email:\s*([A-Za-z0-9@]+)', oi, 
re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['CONTACT_EMAIL'].append(pnum)
+                        found = True
+                    match = re.search(r'e-mail address:\s*([A-Za-z0-9@]+)', 
oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['CONTACT_EMAIL'].append(pnum)
+                        found = True
+                    match = re.search(r'email address:\s*([A-Za-z0-9@]+)', oi, 
re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['CONTACT_EMAIL'].append(pnum)
+                        found = True
+                    match = re.search(r'Tel.:\s*([A-Za-z0-9() +-]+)', oi, 
re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['CONTACT_PHONE'].append(pnum)
+                        found = True
+                    match = re.search(r'Phone:\s*([A-Za-z0-9() +-]+)', oi, 
re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['CONTACT_PHONE'].append(pnum)
+                        found = True
+                    match = re.search(r'Tel. \(office\):\s*([A-Za-z0-9() 
+-]+)', oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['CONTACT_PHONE'].append(pnum)
+                        found = True
+                    match = re.search(r'DOB:\s*([A-Za-z0-9:\. -]+)', oi, 
re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['DATE_OF_BIRTH'].append(pnum)
+                        found = True
+                    match = re.search(r'Date range: DOB 
between\s*([A-Za-z0-9:\. -]+)', oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['DATE_OF_BIRTH'].append(pnum)
+                        found = True
+                    if not found:
+                        target_data['other_information'].append(oi)
+
+        elif entity_elem is not None:
+            target_data['target_type'] = 'entity'
+            registry_data = self.process_entity(entity_elem, places_lookup)
+
+            # Extract justifications, relations, other info (same structure as 
individual)
+            for just_elem in entity_elem.findall('justification'):
+                if just_elem.text:
+                    target_data['justification'].append(just_elem.text.strip())
+
+            for rel_elem in entity_elem.findall('relation'):
+                relation_info = {
+                    'target_id': rel_elem.get('target-id'),
+                    'relation_type': rel_elem.get('relation-type'),
+                    'remark': None
+                }
+                remark_elem = rel_elem.find('remark')
+                if remark_elem is not None and remark_elem.text:
+                    relation_info['remark'] = remark_elem.text.strip()
+                target_data['relations'].append(relation_info)
+
+            for other_elem in entity_elem.findall('other-information'):
+                if other_elem.text:
+                    # "other-information" is very messy. We try our best to 
match
+                    # it against various regular expressions and extract bits.
+                    oi = other_elem.text.strip()
+                    found = False;
+                    match = re.search(r'Tel.:\s*([A-Za-z0-9() +-]+)', oi, 
re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['CONTACT_PHONE'].append(pnum)
+                        found = True
+                    match = re.search(r'Company phone:\s*([A-Za-z0-9() +-]+)', 
oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['CONTACT_PHONE'].append(pnum)
+                        found = True
+                    match = re.search(r'Phone:\s*([A-Za-z0-9() +-]+)', oi, 
re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['CONTACT_PHONE'].append(pnum)
+                        found = True
+                    match = re.search(r'e-mail:\s*([A-Za-z0-9@]+)', oi, 
re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['CONTACT_EMAIL'].append(pnum)
+                        found = True
+                    match = re.search(r'e-mail address:\s*([A-Za-z0-9@]+)', 
oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['CONTACT_EMAIL'].append(pnum)
+                        found = True
+                    match = re.search(r'email address:\s*([A-Za-z0-9@]+)', oi, 
re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['CONTACT_EMAIL'].append(pnum)
+                        found = True
+                    match = re.search(r'company email:\s*([A-Za-z0-9@]+)', oi, 
re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['CONTACT_EMAIL'].append(pnum)
+                        found = True
+                    match = re.search(r'Date of 
registration:\s*([A-Za-z0-9\/\.]+)', oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['FOUNDING_DATE'].append(pnum)
+                        found = True
+                    match = 
re.search(r'([A-Za-z]*)\s*Number([A-Za-z()]*)\s:\s*([A-Za-z0-9 -]+)', oi, 
re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum)
+                        found = True
+                    match = re.search(r'Registration no:\s*([A-Za-z0-9 -]+)', 
oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum)
+                        found = True
+                    match = re.search(r'Registration Number:\s*([A-Za-z0-9 
-]+)', oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum)
+                        found = True
+                    if not found:
+                        target_data['other_information'].append(oi)
+
+        elif object_elem is not None:
+            target_data['target_type'] = 'other'
+            target_data['object_type'] = object_elem.get('object-type')
+            registry_data = self.process_object(object_elem, places_lookup)
+
+            # Extract justifications, relations, other info (same structure)
+            for just_elem in object_elem.findall('justification'):
+                if just_elem.text:
+                    target_data['justification'].append(just_elem.text.strip())
+
+            for rel_elem in object_elem.findall('relation'):
+                relation_info = {
+                    'target_id': rel_elem.get('target-id'),
+                    'relation_type': rel_elem.get('relation-type'),
+                    'remark': None
+                }
+                remark_elem = rel_elem.find('remark')
+                if remark_elem is not None and remark_elem.text:
+                    relation_info['remark'] = remark_elem.text.strip()
+                target_data['relations'].append(relation_info)
+
+            for other_elem in object_elem.findall('other-information'):
+                if other_elem.text:
+                    # "other-information" is very messy. We try our best to 
match
+                    # it against various regular expressions and extract bits.
+                    oi = other_elem.text.strip()
+                    found = False
+                    match = re.search(r'Registration no\.:\s*([A-Za-z0-9 
-]+)', oi, re.IGNORECASE)
+                    pnum = match.group(1) if match else None
+                    if pnum is not None:
+                        target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum)
+                        found = True
+                    if not found:
+                        target_data['other_information'].append(oi)
+
+        # Extract generic attributes
+        for attr_elem in target_elem.findall('generic-attribute'):
+            attr_name = attr_elem.get('name')
+            if attr_name and attr_elem.text:
+                target_data['generic_attributes'][attr_name] = 
attr_elem.text.strip()
+
+        # Merge registry data into target data
+        target_data.update(registry_data)
+
+        return target_data
+
+    def convert_xml_to_json(self, xml_file: str, active_only: bool = False) -> 
Dict[str, Any]:
+        """Convert Swiss sanctions XML file to JSON format."""
+        root = self.parse_xml(xml_file)
+
+        # Build place lookup
+        places_lookup = self.build_place_lookup(root)
+
+        # Extract metadata
+        metadata = {
+            'list_type': root.get('list-type'),
+            'date': root.get('date'),
+            'conversion_timestamp': datetime.now().isoformat(),
+            'total_targets': 0,
+            'total_places': len(places_lookup)
+        }
+
+        # Process sanctions programs
+        programs = []
+        for program_elem in root.findall('sanctions-program'):
+            program_data = {
+                'ssid': program_elem.get('ssid'),
+                'version_date': program_elem.get('version-date'),
+                'predecessor_version_date': 
program_elem.get('predecessor-version-date'),
+                'program_keys': {},
+                'program_names': {},
+                'sanctions_sets': {},
+                'origin': None
+            }
+
+            # Extract program keys
+            for key_elem in program_elem.findall('program-key'):
+                lang = key_elem.get('lang')
+                if lang and key_elem.text:
+                    program_data['program_keys'][lang] = key_elem.text.strip()
+
+            # Extract program names
+            for name_elem in program_elem.findall('program-name'):
+                lang = name_elem.get('lang')
+                if lang and name_elem.text:
+                    program_data['program_names'][lang] = 
name_elem.text.strip()
+
+            # Extract sanctions sets
+            for set_elem in program_elem.findall('sanctions-set'):
+                lang = set_elem.get('lang')
+                ssid = set_elem.get('ssid')
+                if lang and ssid and set_elem.text:
+                    if ssid not in program_data['sanctions_sets']:
+                        program_data['sanctions_sets'][ssid] = {}
+                    program_data['sanctions_sets'][ssid][lang] = 
set_elem.text.strip()
+
+            # Extract origin
+            origin_elem = program_elem.find('origin')
+            if origin_elem is not None and origin_elem.text:
+                program_data['origin'] = origin_elem.text.strip()
+
+            programs.append(program_data)
+
+        # Process targets
+        targets = []
+
+        # Filter targets if active_only is requested
+        if active_only and 'target' in root:
+            print(f"Filtering for active targets", file=sys.stderr)
+            targets = root['target'] if isinstance(root['target'], list) else 
[root['target']]
+            active_targets = [target for target in targets if 
self._is_target_active(target)]
+
+            if active_targets:
+                root['target'] = active_targets if len(active_targets) > 1 
else active_targets[0]
+            else:
+                # Remove targets key if no active targets
+                del root['target']
+
+        for target_elem in root.findall('target'):
+            # The "_is_target_active" logic expects JSON, convert first
+            data = self._parse_element (target_elem)
+            if self._is_target_active(data) or not active_only:
+               target_data = self.process_target(target_elem, places_lookup)
+            else:
+               target_data = None
+            if target_data:
+                targets.append(target_data)
+
+        metadata['total_targets'] = len(targets)
+
+        # Build final JSON structure
+        result = {
+            'metadata': metadata,
+            'sanctions_programs': programs,
+            'targets': targets,
+            'places': places_lookup
+        }
+
+        return result
+
+def main():
+    """Main entry point for the converter."""
+    parser = argparse.ArgumentParser(
+        description='Convert Swiss sanction list from XML to JSON format',
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+        epilog="""
+Examples:
+    robocop-ch-to-json --active < sanctions.xml > sanctions.json
+    robocop-ch-to-json --input sanctions.xml --output sanctions.json
+        """
+    )
+
+    parser.add_argument('--active', action='store_true',
+                       help='Only include active targets (exclude de-listed)')
+    parser.add_argument('--input', help='Input XML file path')
+    parser.add_argument('--output', '-o', help='Output JSON file path 
(default: stdout)')
+    parser.add_argument('-v', '--verbose', action='store_true', help='Enable 
verbose output')
+    parser.add_argument('--indent', type=int, default=2, help='JSON 
indentation level (default: 2)')
+
+    args = parser.parse_args()
+
+    try:
+        converter = SwissSanctionsConverter()
+
+        # Convert XML to JSON
+        json_data = converter.convert_xml_to_json(args.input, args.active)
+
+        # Save JSON file
+        json_result = json_data['targets']
+
+        # Output to file or stdout
+        try:
+            if args.output:
+                with open(args.output, 'w', encoding='utf-8') as f:
+                    json.dump(json_result, f, indent=args.indent, 
ensure_ascii=False)
+                    print(f"Successfully converted XML to JSON: 
{args.output}", file=sys.stderr)
+            else:
+                json.dump(json_result, sys.stdout, indent=args.indent, 
ensure_ascii=False)
+        except IOError as e:
+            raise IOError(f"Failed to write JSON output: {e}")
+
+        if args.verbose:
+            print(f"Conversion completed successfully!", file=sys.stderr)
+            print(f"Total targets: {json_data['metadata']['total_targets']}", 
file=sys.stderr)
+            print(f"Total places: {json_data['metadata']['total_places']}", 
file=sys.stderr)
+            print(f"Total programs: {len(json_data['sanctions_programs'])}", 
file=sys.stderr)
+
+    except Exception as e:
+        print(f"Error: {e}", file=sys.stderr)
+        sys.exit(1)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/robocop-json-postprocess b/robocop-json-postprocess
new file mode 100755
index 0000000..73f02ae
--- /dev/null
+++ b/robocop-json-postprocess
@@ -0,0 +1,4 @@
+#!/bin/sh
+# This script is in the public domain.
+# It removes empty arrays, objects and null values from the JSON data 
structure it is given.
+exec jq 'walk(if type == "object" then with_entries(select(.value != [] and 
.value != {} and .value != null)) else . end)'

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]