cancel
Showing results for 
Search instead for 
Did you mean: 

Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.

Import Log Data from ELK - ELASTIC COMMON SCHEMA

Hello,

I try to import Log Data from ELK which are in the ELASTIC COMMON SCHEMA - ECS
(https://www.elastic.co/guide/en/ecs/current/ecs-reference.html)

The aim of this work is a connected Graph (timebased) which shows the single events of a "Log Story". A log story is a single user activity which produces many events but this events are in ECS not connected. A simple seassion in my lab produces with an NGINX (as Reverseproxy) + Guacamole (as Remote Web Admin Console) + other linux systems many log events an I try to map these Events together.

On a top Level View of 1(!) requested EVENT I got an nested JSON like this:

{
  "_type": "_doc",
  "_source": {
    "server": {
      "port": 80,
      "packets": 7,
      "bytes": 1044,
      "ip": "192.168.178.73"
    },
    "process": {
      "name": "nginx",
      "args": [
        "nginx: worker process"
      ],
      "pid": 1698,
      "executable": "/usr/sbin/nginx",
      "created": "2019-11-11T06:22:49.830Z"
    },
    "agent": {
      "hostname": "nginx",
      "id": "28cfb698-1aac-482f-8931-35f3c10c526c",
      "type": "auditbeat",
      "ephemeral_id": "315d5cdd-f403-4ee4-9907-3707c4bf54dd",
      "version": "7.4.2"
    },
    "destination": {
      "port": 80,
      "packets": 7,
      "bytes": 1044,
      "ip": "192.168.178.73"
    },
    "source": {
      "port": 19149,
      "packets": 6,
      "bytes": 326,
      "ip": "x.x.x.x"
    },
    "network": {
      "community_id": "1:1PtDDQARII2dPRid/1LS1AlerUc=",
      "transport": "tcp",
      "type": "ipv4",
      "bytes": 1370,
      "packets": 13,
      "direction": "inbound"
    },
    "@timestamp": "2019-11-20T00:07:29.410Z",
    "system": {
      "audit": {
        "socket": {
          "uid": 33,
          "egid": 33,
          "internal_version": "1.0.3",
          "gid": 33,
          "euid": 33,
          "kernel_sock_address": "0xffff9464e0fce800"
        }
      }
    },
    "ecs": {
      "version": "1.1.0"
    },
    "service": {
      "type": "system"
    },
    "host": {
      "name": "nginx",
      "hostname": "nginx",
      "id": "c9250b1f6acb405da67881552d455ee4",
      "os": {
        "name": "Ubuntu",
        "family": "debian",
        "version": "18.04.3 LTS (Bionic Beaver)",
        "kernel": "4.15.0-66-generic",
        "platform": "ubuntu",
        "codename": "bionic"
      },
      "architecture": "x86_64",
      "containerized": false
    },
    "client": {
      "port": 19149,
      "packets": 6,
      "bytes": 326,
      "ip": "x.x.x.x"
    },
    "event": {
      "duration": 5287690744,
      "kind": "event",
      "module": "system",
      "start": "2019-11-20T00:07:22.087Z",
      "action": "network_flow",
      "end": "2019-11-20T00:07:27.374Z",
      "category": "network_traffic",
      "dataset": "socket"
    },
    "user": {
      "name": "www-data",
      "id": "33"
    },
    "flow": {
      "final": true,
      "complete": true
    },
    "group": {
      "name": "www-data",
      "id": "33"
    }
  },
  "_id": "Xw0ihm4B7Iy8Ibw1rz9s",
  "_index": "auditbeat-7.4.2-2019.11.18-000001",
  "_score": 1.0
}

This Event should be merged with the next Events.

Today I read the following to nested JSON (Read nested json) - ECS Events are nested and I need to connect one ECS Event to another.

Is there any solution to connect the ECS events or is there a better way to get a connection between these events?

Greetings
Sebastian

3 REPLIES 3

Hi Sebastian,
as you read yourself about nested properties - they are not possible in Neo4j so far. However, that does not mean that you cannot model your problem. Whenever you have a nested property you could create a new node that is connected to the original node, e.g. something like (e:Event)-[:HAS_SOURCE]->(so:Source)-[:HAS_SERVER]->(se:Server {port:80, ...})...
That way you have specific nodes e.g. for servers which means that you do not need to have duplicate information for every event that has the Server.
Regards,
Elena

Hello Elena, thanks for the information - I read a little bit more in the other descriptions!

Is there anyway to declare the nested JSON objects as a different "AS" statement?

I think in https://neo4j.com/docs/labs/apoc/current/database-integration/elasticsearch/ is an example with different JSON Objects maybe nested but I´m not sure.

I tried the following:

CALL apoc.es.get(
'localhost','BEAT','_search',null,null,
'{   "query":{      "range":{         "@timestamp":{            "gte" : "2019-11-20T00:00:00.000Z",            "lt" :  "2019-11-20T22:30:00.000Z"         }      }   }}'
) 
yield value with 
value.hits.hits.server as j_server, 
value.hits.hits.process as j_process, 
value.hits.hits.agent as j_agent, 
value.hits.hits.destination as j_destination, 
value.hits.hits.source as j_source, 
value.hits.hits.network as j_network, 
value.hits.hits.system as j_system, 
value.hits.hits.ecs as j_ecs, 
value.hits.hits.service as j_service, 
value.hits.hits.host as j_host, 
value.hits.hits.client as j_client, 
value.hits.hits.event as j_event, 
value.hits.hits.user as j_user, 
value.hits.hits.flow as j_flow 
UNWIND j_host as hostJSON
MERGE (host:HOST {name:hostJSON.name}) 
RETURN host

The idea is to use the j_subnodes seperately.

On execution I get an error

Neo.ClientError.Statement.TypeError
Type mismatch: expected a map but was List {Map{_type -> String("_doc"), ...

Greetings Sebastian

I found a solution to import the ECS Events but I had to reference the nested JSON
the example contains only - server,host,process,destination,source,network,client.

Sometimes the values are "null" and with coalesce() I defined them as 'none'

CALL apoc.es.get('localhoast','BEAT','_search',null,null,'{"query":{"range":{"@timestamp":{ "gte":"2019-11-20T00:00:00.000Z","lt":"2019-11-20T22:30:00.000Z"}}}}') 
yield value with value.hits.hits as hits
UNWIND hits as hit

MERGE (event:Event {

id: hit._id, 

server_PORT: coalesce(hit._source.server.port,'none'),
server_PACKETS: coalesce(hit._source.server.packets,'none'),
server_BYTES: coalesce(hit._source.server.BYTES,'none'),
server_IP: coalesce(hit._source.server.ip,'none'),

host_NAME: coalesce(hit._source.host.name,'none'),
host_HOSTNAME: coalesce(hit._source.host.hostname,'none'),
host_ID: coalesce(hit._source.host.id,'none'),
host_ARCHITECTURE: coalesce(hit._source.host.architecture,'none'),
host_CONTAINERIZED: coalesce(hit._source.host.containerized,'none'),

process_NAME: coalesce(hit._source.process.name,'none'),
process_ARGS: coalesce(hit._source.process.args,'none'),  
process_PID: coalesce(hit._source.process.pid,'none'),
process_EXECUTEABLE: coalesce(hit._source.process.executable,'none'),
process_CREATED: coalesce(hit._source.process.created,'none'),
	 
destination_PORT: coalesce(hit._source.destination.port,'none'),
destination_PACKETS: coalesce(hit._source.destination.packets,'none'),
destination_BYTES: coalesce(hit._source.destination.bytes,'none'),
destination_IP: coalesce(hit._source.destination.ip,'none'),

source_PORT: coalesce(hit._source.source.port,'none'),
source_PACKETS: coalesce(hit._source.source.packets,'none'),
source_BYTES: coalesce(hit._source.source.bytes,'none'),
source_IP: coalesce(hit._source.source.ip,'none'),
    
network_COMMUNITY_ID: coalesce(hit._source.network.community_id,'none'),	
network_TRANSPORT: coalesce(hit._source.network.transport,'none'),	
network_TYPE: coalesce(hit._source.network.type,'none'),	
network_BYTES: coalesce(hit._source.network.bytes,'none'),	
network_PACKETS: coalesce(hit._source.network.packets,'none'),	
network_DIRECTION: coalesce(hit._source.network.direction,'none'),	

client_PORT: coalesce(hit._source.client.port,'none'),
client_PACKETS: coalesce(hit._source.client.packets,'none'),
client_BYTES: coalesce(hit._source.client.bytes,'none'),
client_IP: coalesce(hit._source.client.IP,'none')

})  
return event

For the next step I hope sombody could give me a hint or has some experiences.
Im looking for a graph based connection between the events and Im not sure which way could be the best (I call the identifier between events shared identifier):

  1. Timebased (is a timeline graph possible?)
  2. Hostbased (shared identifier e.g. PPID -> PID ; user login)
  3. Communication-based (shared identifier e.g. client_ip:port -> server_ip:port)