Head's Up! These forums are read-only. All users and content have migrated. Please join us at community.neo4j.com.
01-04-2020 12:23 AM
Hello,
I try to import Log Data from ELK which are in the ELASTIC COMMON SCHEMA - ECS
(https://www.elastic.co/guide/en/ecs/current/ecs-reference.html)
The aim of this work is a connected Graph (timebased) which shows the single events of a "Log Story". A log story is a single user activity which produces many events but this events are in ECS not connected. A simple seassion in my lab produces with an NGINX (as Reverseproxy) + Guacamole (as Remote Web Admin Console) + other linux systems many log events an I try to map these Events together.
On a top Level View of 1(!) requested EVENT I got an nested JSON like this:
{
"_type": "_doc",
"_source": {
"server": {
"port": 80,
"packets": 7,
"bytes": 1044,
"ip": "192.168.178.73"
},
"process": {
"name": "nginx",
"args": [
"nginx: worker process"
],
"pid": 1698,
"executable": "/usr/sbin/nginx",
"created": "2019-11-11T06:22:49.830Z"
},
"agent": {
"hostname": "nginx",
"id": "28cfb698-1aac-482f-8931-35f3c10c526c",
"type": "auditbeat",
"ephemeral_id": "315d5cdd-f403-4ee4-9907-3707c4bf54dd",
"version": "7.4.2"
},
"destination": {
"port": 80,
"packets": 7,
"bytes": 1044,
"ip": "192.168.178.73"
},
"source": {
"port": 19149,
"packets": 6,
"bytes": 326,
"ip": "x.x.x.x"
},
"network": {
"community_id": "1:1PtDDQARII2dPRid/1LS1AlerUc=",
"transport": "tcp",
"type": "ipv4",
"bytes": 1370,
"packets": 13,
"direction": "inbound"
},
"@timestamp": "2019-11-20T00:07:29.410Z",
"system": {
"audit": {
"socket": {
"uid": 33,
"egid": 33,
"internal_version": "1.0.3",
"gid": 33,
"euid": 33,
"kernel_sock_address": "0xffff9464e0fce800"
}
}
},
"ecs": {
"version": "1.1.0"
},
"service": {
"type": "system"
},
"host": {
"name": "nginx",
"hostname": "nginx",
"id": "c9250b1f6acb405da67881552d455ee4",
"os": {
"name": "Ubuntu",
"family": "debian",
"version": "18.04.3 LTS (Bionic Beaver)",
"kernel": "4.15.0-66-generic",
"platform": "ubuntu",
"codename": "bionic"
},
"architecture": "x86_64",
"containerized": false
},
"client": {
"port": 19149,
"packets": 6,
"bytes": 326,
"ip": "x.x.x.x"
},
"event": {
"duration": 5287690744,
"kind": "event",
"module": "system",
"start": "2019-11-20T00:07:22.087Z",
"action": "network_flow",
"end": "2019-11-20T00:07:27.374Z",
"category": "network_traffic",
"dataset": "socket"
},
"user": {
"name": "www-data",
"id": "33"
},
"flow": {
"final": true,
"complete": true
},
"group": {
"name": "www-data",
"id": "33"
}
},
"_id": "Xw0ihm4B7Iy8Ibw1rz9s",
"_index": "auditbeat-7.4.2-2019.11.18-000001",
"_score": 1.0
}
This Event should be merged with the next Events.
Today I read the following to nested JSON (Read nested json) - ECS Events are nested and I need to connect one ECS Event to another.
Is there any solution to connect the ECS events or is there a better way to get a connection between these events?
Greetings
Sebastian
01-04-2020 05:55 AM
Hi Sebastian,
as you read yourself about nested properties - they are not possible in Neo4j so far. However, that does not mean that you cannot model your problem. Whenever you have a nested property you could create a new node that is connected to the original node, e.g. something like (e:Event)-[:HAS_SOURCE]->(so:Source)-[:HAS_SERVER]->(se:Server {port:80, ...})...
That way you have specific nodes e.g. for servers which means that you do not need to have duplicate information for every event that has the Server.
Regards,
Elena
01-04-2020 11:05 AM
Hello Elena, thanks for the information - I read a little bit more in the other descriptions!
Is there anyway to declare the nested JSON objects as a different "AS" statement?
I think in https://neo4j.com/docs/labs/apoc/current/database-integration/elasticsearch/ is an example with different JSON Objects maybe nested but I´m not sure.
I tried the following:
CALL apoc.es.get(
'localhost','BEAT','_search',null,null,
'{ "query":{ "range":{ "@timestamp":{ "gte" : "2019-11-20T00:00:00.000Z", "lt" : "2019-11-20T22:30:00.000Z" } } }}'
)
yield value with
value.hits.hits.server as j_server,
value.hits.hits.process as j_process,
value.hits.hits.agent as j_agent,
value.hits.hits.destination as j_destination,
value.hits.hits.source as j_source,
value.hits.hits.network as j_network,
value.hits.hits.system as j_system,
value.hits.hits.ecs as j_ecs,
value.hits.hits.service as j_service,
value.hits.hits.host as j_host,
value.hits.hits.client as j_client,
value.hits.hits.event as j_event,
value.hits.hits.user as j_user,
value.hits.hits.flow as j_flow
UNWIND j_host as hostJSON
MERGE (host:HOST {name:hostJSON.name})
RETURN host
The idea is to use the j_subnodes seperately.
On execution I get an error
Neo.ClientError.Statement.TypeError
Type mismatch: expected a map but was List {Map{_type -> String("_doc"), ...
Greetings Sebastian
01-11-2020 04:08 AM
I found a solution to import the ECS Events but I had to reference the nested JSON
the example contains only - server,host,process,destination,source,network,client.
Sometimes the values are "null" and with coalesce() I defined them as 'none'
CALL apoc.es.get('localhoast','BEAT','_search',null,null,'{"query":{"range":{"@timestamp":{ "gte":"2019-11-20T00:00:00.000Z","lt":"2019-11-20T22:30:00.000Z"}}}}')
yield value with value.hits.hits as hits
UNWIND hits as hit
MERGE (event:Event {
id: hit._id,
server_PORT: coalesce(hit._source.server.port,'none'),
server_PACKETS: coalesce(hit._source.server.packets,'none'),
server_BYTES: coalesce(hit._source.server.BYTES,'none'),
server_IP: coalesce(hit._source.server.ip,'none'),
host_NAME: coalesce(hit._source.host.name,'none'),
host_HOSTNAME: coalesce(hit._source.host.hostname,'none'),
host_ID: coalesce(hit._source.host.id,'none'),
host_ARCHITECTURE: coalesce(hit._source.host.architecture,'none'),
host_CONTAINERIZED: coalesce(hit._source.host.containerized,'none'),
process_NAME: coalesce(hit._source.process.name,'none'),
process_ARGS: coalesce(hit._source.process.args,'none'),
process_PID: coalesce(hit._source.process.pid,'none'),
process_EXECUTEABLE: coalesce(hit._source.process.executable,'none'),
process_CREATED: coalesce(hit._source.process.created,'none'),
destination_PORT: coalesce(hit._source.destination.port,'none'),
destination_PACKETS: coalesce(hit._source.destination.packets,'none'),
destination_BYTES: coalesce(hit._source.destination.bytes,'none'),
destination_IP: coalesce(hit._source.destination.ip,'none'),
source_PORT: coalesce(hit._source.source.port,'none'),
source_PACKETS: coalesce(hit._source.source.packets,'none'),
source_BYTES: coalesce(hit._source.source.bytes,'none'),
source_IP: coalesce(hit._source.source.ip,'none'),
network_COMMUNITY_ID: coalesce(hit._source.network.community_id,'none'),
network_TRANSPORT: coalesce(hit._source.network.transport,'none'),
network_TYPE: coalesce(hit._source.network.type,'none'),
network_BYTES: coalesce(hit._source.network.bytes,'none'),
network_PACKETS: coalesce(hit._source.network.packets,'none'),
network_DIRECTION: coalesce(hit._source.network.direction,'none'),
client_PORT: coalesce(hit._source.client.port,'none'),
client_PACKETS: coalesce(hit._source.client.packets,'none'),
client_BYTES: coalesce(hit._source.client.bytes,'none'),
client_IP: coalesce(hit._source.client.IP,'none')
})
return event
For the next step I hope sombody could give me a hint or has some experiences.
Im looking for a graph based connection between the events and I
m not sure which way could be the best (I call the identifier between events shared identifier):
All the sessions of the conference are now available online